From 34571a60371e80ca0ccabdc5a841c98b2b0f52bd Mon Sep 17 00:00:00 2001 From: Clay Woods Date: Thu, 23 Jan 2025 07:25:00 -0800 Subject: [PATCH] #539 Spark schema generation works with record relations Update Spark Schema MDA generation to account for relations between records. Updated the relation mda to include column and required fields. Added record relation unit tests. Spark Schema casting and to/from pojos work with records with relations. Updated documentation to account for the new relation fields. Implement validation for relations except one to M. --- DRAFT_RELEASE_NOTES.md | 6 +- build-parent/pom.xml | 2 +- docs/modules/ROOT/pages/record-metamodel.adoc | 13 +- .../data/delivery/spark/SparkSchema.java | 16 ++ .../element/BaseRecordRelationDecorator.java | 69 ++++- .../aiops/mda/metamodel/element/Relation.java | 14 + .../metamodel/element/RelationElement.java | 15 ++ .../metamodel/element/spark/SparkRecord.java | 13 + .../element/spark/SparkRecordRelation.java | 38 +++ .../spark.schema.base.java.vm | 120 ++++++++- .../data-delivery-spark/encryption.java.vm | 2 +- .../synchronous.processor.base.java.vm | 2 +- .../test-data-delivery-spark-model/pom.xml | 2 + .../SparkJavaDataDeliveryDictionary.json | 4 + .../src/main/resources/records/Address.json | 3 +- .../src/main/resources/records/City.json | 27 ++ .../src/main/resources/records/Mayor.json | 22 ++ .../records/PersonWithMToOneRelation.json | 2 +- .../src/main/resources/records/State.json | 14 + .../src/main/resources/records/Street.json | 29 ++ .../aiops/mda/pattern/SparkSchemaTest.java | 247 ++++++++++++++++++ .../specifications/sparkSchema.feature | 49 ++++ 22 files changed, 687 insertions(+), 22 deletions(-) create mode 100644 foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/spark/SparkRecordRelation.java create mode 100644 test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/City.json create mode 100644 test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Mayor.json create mode 100644 test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/State.json create mode 100644 test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Street.json create mode 100644 test/test-mda-models/test-data-delivery-spark-model/src/test/java/com/boozallen/aiops/mda/pattern/SparkSchemaTest.java create mode 100644 test/test-mda-models/test-data-delivery-spark-model/src/test/resources/specifications/sparkSchema.feature diff --git a/DRAFT_RELEASE_NOTES.md b/DRAFT_RELEASE_NOTES.md index 0638a399c..b4252d754 100644 --- a/DRAFT_RELEASE_NOTES.md +++ b/DRAFT_RELEASE_NOTES.md @@ -13,7 +13,11 @@ Data access through [GraphQL](https://graphql.org/) has been deprecated and repl Spark and PySpark have been upgraded from version 3.5.2 to 3.5.4. ## Record Relation -To enable nested data records, we have added a new relation feature to the record metamodel. This allows records to reference other records. For more details, refer to the [Record Relation Options].(https://boozallen.github.io/aissemble/aissemble/current-dev/record-metamodel.html#_record_relation_options) +To enable nested data records, we have added a new relation feature to the record metamodel. This allows records to +reference other records. For more details, refer to the [Record Relation Options].(https://boozallen.github.io/aissemble/aissemble/current-dev/record-metamodel.html#_record_relation_options). +Several features are still a work in progress: +- Spark-based validation for records with a One to Many multiplicity. (POJO validation is available.) +- PySpark schema generation for records with any multiplicity # Breaking Changes _Note: instructions for adapting to these changes are outlined in the upgrade instructions below._ diff --git a/build-parent/pom.xml b/build-parent/pom.xml index f8b0afba6..8292dd73e 100644 --- a/build-parent/pom.xml +++ b/build-parent/pom.xml @@ -48,7 +48,7 @@ 2.9.1 4.24.0 0.10.1 - 3.0.8 + 3.1.12 2.15.0 2.6.0 3.8.6 diff --git a/docs/modules/ROOT/pages/record-metamodel.adoc b/docs/modules/ROOT/pages/record-metamodel.adoc index a46f063fd..a5e8f70a6 100644 --- a/docs/modules/ROOT/pages/record-metamodel.adoc +++ b/docs/modules/ROOT/pages/record-metamodel.adoc @@ -247,13 +247,22 @@ namespacing (e.g., package in Java, namespace in XSD). | `relations/relation/documentation` | No | None -| A description of the field. +| A description of the relation. | `relations/relation/multiplicity` | No | One to Many (1-M) | Defines the multiplicity of the relation. Options are ONE_TO_MANY (1-M), ONE_TO_ONE (1-1), and MANY_TO_ONE (M-1). +| `relations/relation/required` +| No +| false +| Setting `required` to `true` will mandate that the relation must be populated for a record to pass validation. + +| `relations/relation/column` +| No +| None +| The name of the storage field for data persistence. |=== -\ \ No newline at end of file +\ diff --git a/extensions/extensions-data-delivery/extensions-data-delivery-spark/src/main/java/com/boozallen/aiops/data/delivery/spark/SparkSchema.java b/extensions/extensions-data-delivery/extensions-data-delivery-spark/src/main/java/com/boozallen/aiops/data/delivery/spark/SparkSchema.java index 6eec43801..1872f50eb 100644 --- a/extensions/extensions-data-delivery/extensions-data-delivery-spark/src/main/java/com/boozallen/aiops/data/delivery/spark/SparkSchema.java +++ b/extensions/extensions-data-delivery/extensions-data-delivery-spark/src/main/java/com/boozallen/aiops/data/delivery/spark/SparkSchema.java @@ -127,6 +127,22 @@ protected void add(String name, DataType dataType, boolean nullable, String comm schema = schema.add(name, dataType, nullable, comment); } + /** + * Adds a field to the schema. + * + * @param name + * the name of the field to add + * @param structType + * the struct type of the field to add + * @param nullable + * whether the field is nullable + * @param comment + * a description of the field + */ + protected void add(String name, StructType structType, boolean nullable, String comment) { + schema = schema.add(name, structType, nullable, comment); + } + /** * Updates the data type of a field in the schema. * diff --git a/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/BaseRecordRelationDecorator.java b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/BaseRecordRelationDecorator.java index 566edaca0..75a897a52 100644 --- a/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/BaseRecordRelationDecorator.java +++ b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/BaseRecordRelationDecorator.java @@ -2,7 +2,7 @@ /*- * #%L - * AIOps Foundation::AIOps MDA + * aiSSEMBLE::Foundation::MDA * %% * Copyright (C) 2021 Booz Allen * %% @@ -10,11 +10,14 @@ * #L% */ +import org.apache.commons.lang3.StringUtils; import org.technologybrewery.fermenter.mda.metamodel.element.MetamodelUtils; +import com.boozallen.aiops.mda.generator.util.PipelineUtils; + /** * Provides baseline decorator functionality for {@link Relation}. - * + * * The goal is to make it easier to apply the decorator pattern in various implementations of generators (e.g., Java, * python, Docker) so that each concrete decorator only has to decorate those aspects of the class that are needed, not * all the pass-through methods that each decorator would otherwise need to implement (that add no real value). @@ -59,6 +62,16 @@ public String getName() { return wrapped.getName(); } + @Override + public Boolean isRequired() { + return wrapped.isRequired(); + } + + @Override + public String getColumn() { + return wrapped.getColumn(); + } + @Override public void validate() { wrapped.validate(); @@ -71,4 +84,56 @@ public void validate() { public boolean isOneToManyRelation() { return wrapped.getMultiplicity().equals(Multiplicity.ONE_TO_MANY); } + + /** + * Whether the Spark relation is nullable. + * + * @return true if the Spark field is nullable + */ + public boolean isNullable() { + return wrapped.isRequired() == null || !wrapped.isRequired(); + } + + /** + * Returns the column name for the Spark relation. + * + * @return column name + */ + public String getColumnName() { + String columnName; + + if (StringUtils.isNotBlank(wrapped.getColumn())) { + columnName = wrapped.getColumn(); + } else { + columnName = wrapped.getName(); + } + return columnName; + } + + /** + * Returns the relation name formatted to uppercase with underscores. + * + * @return name formatted to uppercase with underscores + */ + public String getUpperSnakecaseName() { + return PipelineUtils.deriveUpperUnderscoreNameFromUpperCamelName(getName()); + } + + /** + * Returns the relation name, capitalized. + * + * @return capitalized name + */ + public String getCapitalizedName() { + return StringUtils.capitalize(getName()); + } + + /** + * Returns the relation name, uncapitalized. + * + * @return uncapitalized name + */ + public String getUncapitalizeName() { + return StringUtils.uncapitalize(getName()); + } } diff --git a/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/Relation.java b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/Relation.java index 23e4aeac3..8f5bace8f 100644 --- a/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/Relation.java +++ b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/Relation.java @@ -33,6 +33,20 @@ public interface Relation extends NamespacedMetamodel { */ Multiplicity getMultiplicity(); + /** + * Returns whether the relation is required. + * + * @return required + */ + Boolean isRequired(); + + /** + * Returns the column of the relation. + * + * @return column + */ + String getColumn(); + /** * Enumerated values representing multiplicity options. */ diff --git a/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/RelationElement.java b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/RelationElement.java index 0204f571f..0e8d00eaf 100644 --- a/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/RelationElement.java +++ b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/RelationElement.java @@ -34,6 +34,11 @@ public class RelationElement extends NamespacedMetamodelElement implements Relat @JsonInclude(Include.NON_NULL) protected Multiplicity multiplicity; + @JsonInclude(Include.NON_NULL) + protected Boolean required; + + @JsonInclude(Include.NON_NULL) + protected String column; /** * {@inheritDoc} @@ -51,6 +56,16 @@ public Multiplicity getMultiplicity() { return multiplicity; } + @Override + public Boolean isRequired() { + return required; + } + + @Override + public String getColumn() { + return column; + } + /** * {@inheritDoc} diff --git a/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/spark/SparkRecord.java b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/spark/SparkRecord.java index 308a34287..d51636159 100644 --- a/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/spark/SparkRecord.java +++ b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/spark/SparkRecord.java @@ -15,6 +15,7 @@ import java.util.Set; import java.util.TreeSet; +import com.boozallen.aiops.mda.metamodel.element.Relation; import com.boozallen.aiops.mda.metamodel.element.java.JavaRecordFieldType; import com.boozallen.aiops.mda.metamodel.element.util.JavaElementUtils; import org.technologybrewery.fermenter.mda.TypeManager; @@ -63,6 +64,18 @@ public List getFields() { return fields; } + /** + * {@inheritDoc} + */ + @Override + public List getRelations() { + List relations = new ArrayList<>(); + for (Relation relation: super.getRelations()){ + relations.add(new SparkRecordRelation(relation)); + } + return relations; + } + /** * {@inheritDoc} */ diff --git a/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/spark/SparkRecordRelation.java b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/spark/SparkRecordRelation.java new file mode 100644 index 000000000..e978f8796 --- /dev/null +++ b/foundation/foundation-mda/src/main/java/com/boozallen/aiops/mda/metamodel/element/spark/SparkRecordRelation.java @@ -0,0 +1,38 @@ +package com.boozallen.aiops.mda.metamodel.element.spark; + +/*- + * #%L + * aiSSEMBLE::Foundation::MDA + * %% + * Copyright (C) 2021 Booz Allen + * %% + * This software package is licensed under the Booz Allen Public License. All Rights Reserved. + * #L% + */ + + +import org.apache.commons.lang3.StringUtils; + +import com.boozallen.aiops.mda.metamodel.element.BaseRecordRelationDecorator; +import com.boozallen.aiops.mda.metamodel.element.Relation; + +/** + * Decorates RecordField with Spark-specific functionality. + */ +public class SparkRecordRelation extends BaseRecordRelationDecorator { + + /** + * {@inheritDoc} + */ + public SparkRecordRelation(Relation recordRelationToDecorate) { + super(recordRelationToDecorate); + } + + /** + * {@inheritDoc} + */ + @Override + public String getDocumentation() { + return StringUtils.isNotBlank(super.getDocumentation()) ? super.getDocumentation() : ""; + } +} diff --git a/foundation/foundation-mda/src/main/resources/templates/data-delivery-data-records/spark.schema.base.java.vm b/foundation/foundation-mda/src/main/resources/templates/data-delivery-data-records/spark.schema.base.java.vm index 6b05a3f91..99c9963de 100644 --- a/foundation/foundation-mda/src/main/resources/templates/data-delivery-data-records/spark.schema.base.java.vm +++ b/foundation/foundation-mda/src/main/resources/templates/data-delivery-data-records/spark.schema.base.java.vm @@ -12,13 +12,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import org.apache.commons.lang3.NotImplementedException; import org.apache.spark.sql.Column; import org.apache.spark.sql.types.DataTypes; import scala.collection.JavaConverters; import scala.collection.Seq; +import scala.collection.mutable.WrappedArray; import com.boozallen.aiops.data.delivery.spark.SparkSchema; import com.boozallen.aiops.data.delivery.spark.SparkDatasetUtils; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.lit; @@ -37,6 +42,11 @@ public abstract class ${record.capitalizedName}SchemaBase extends SparkSchema { #set ($columnVars[$field.name] = "${field.upperSnakecaseName}_COLUMN") public static final String ${columnVars[$field.name]} = "${field.sparkAttributes.columnName}"; #end +#set($relationVars = {}) +#foreach ($relation in $record.relations) + #set ($relationVars[$relation.name] = "${relation.upperSnakecaseName}_COLUMN") + public static final String ${relationVars[$relation.name]} = "${relation.columnName}"; +#end public ${record.capitalizedName}SchemaBase() { super(); @@ -47,10 +57,17 @@ public abstract class ${record.capitalizedName}SchemaBase extends SparkSchema { #else add(${columnVars[$field.name]}, ${field.shortType}, ${field.sparkAttributes.isNullable()}, "${field.description}"); #end + #end + #foreach ($relation in $record.relations) + #if ($relation.isOneToManyRelation()) + add(${relationVars[$relation.name]}, DataTypes.createArrayType(new ${relation.name}Schema().getStructType()), ${relation.isNullable()}, "${relation.documentation}"); + #else + add(${relationVars[$relation.name]}, new ${relation.name}Schema().getStructType(), ${relation.isNullable()}, "${relation.documentation}"); + #end #end } - #if ($record.hasFields()) + #if (($record.hasFields()) || ($record.hasRelations())) /** * Casts the given dataset to this schema. * @@ -62,10 +79,16 @@ public abstract class ${record.capitalizedName}SchemaBase extends SparkSchema { #foreach ($field in $record.fields) DataType ${field.name}Type = getDataType(${columnVars[$field.name]}); #end + #foreach ($relation in $record.relations) + DataType ${relation.uncapitalizeName}Type = getDataType(${relationVars[$relation.name]}); + #end return dataset #foreach ($field in $record.fields) - .withColumn(${columnVars[$field.name]}, col(${columnVars[$field.name]}).cast(${field.name}Type))#if (!$foreach.hasNext);#end + .withColumn(${columnVars[$field.name]}, col(${columnVars[$field.name]}).cast(${field.name}Type))#if((!$foreach.hasNext) && ($relationVars == {}));#end + #end + #foreach ($relation in $record.relations) + .withColumn(${relationVars[$relation.name]}, col(${relationVars[$relation.name]}).cast(${relation.uncapitalizeName}Type))#if(!$foreach.hasNext);#end #end } #end @@ -90,34 +113,56 @@ public abstract class ${record.capitalizedName}SchemaBase extends SparkSchema { /*** * Appends spark validation logic to an unvalidated spark DataFrame (org.apache.spark.sql.Dataset) + * + * @param data the spark Dataset to perform validation against * @return Dataset with appended validation logic */ public Dataset validateDataFrame(Dataset data) { + return validateDataFrame(data, ""); + } + + /*** + * Appends spark validation logic to an unvalidated spark DataFrame (org.apache.spark.sql.Dataset) + * + * @param data the spark Dataset to perform validation against + * @param columnPrefix a prefix to append to the dataset columns that validation is being performed on + * @return Dataset with appended validation logic + */ + protected Dataset validateDataFrame(Dataset data, String columnPrefix) { + #set($hasOneToManyRelation = 0) + #foreach($relation in $record.relations) + #if($relation.isOneToManyRelation()) + #set($hasOneToManyRelation = 1) + #end + #end + #if($hasOneToManyRelation) + throw new NotImplementedException("Validation against relations with One to Many multiplicity is not yet implemented"); + #else Dataset dataWithValidations = data #foreach ($field in $record.fields) #if (${field.isRequired()}) - .withColumn(${columnVars[$field.name]} + "_IS_NOT_NULL", col(${columnVars[$field.name]}).isNotNull()) + .withColumn(${columnVars[$field.name]} + "_IS_NOT_NULL", col(columnPrefix + ${columnVars[$field.name]}).isNotNull()) #end #if (${field.getValidation().getMinValue()}) - .withColumn(${columnVars[$field.name]} + "_GREATER_THAN_MIN", col(${columnVars[$field.name]}).gt(lit(${field.getValidation().getMinValue()})).or(col(${columnVars[$field.name]}).equalTo(lit(${field.getValidation().getMinValue()})))) + .withColumn(${columnVars[$field.name]} + "_GREATER_THAN_MIN", col(columnPrefix + ${columnVars[$field.name]}).gt(lit(${field.getValidation().getMinValue()})).or(col(columnPrefix + ${columnVars[$field.name]}).equalTo(lit(${field.getValidation().getMinValue()})))) #end #if (${field.getValidation().getMaxValue()}) - .withColumn(${columnVars[$field.name]} + "_LESS_THAN_MAX", col(${columnVars[$field.name]}).lt(lit(${field.getValidation().getMaxValue()})).or(col(${columnVars[$field.name]}).equalTo(lit(${field.getValidation().getMaxValue()})))) + .withColumn(${columnVars[$field.name]} + "_LESS_THAN_MAX", col(columnPrefix + ${columnVars[$field.name]}).lt(lit(${field.getValidation().getMaxValue()})).or(col(columnPrefix + ${columnVars[$field.name]}).equalTo(lit(${field.getValidation().getMaxValue()})))) #end #if (${field.getValidation().getScale()}) - .withColumn(${columnVars[$field.name]} + "_MATCHES_SCALE", col(${columnVars[$field.name]}).rlike(("^[0-9]*(?:\\.[0-9]{0,${field.getValidation().getScale()}})?$"))) + .withColumn(${columnVars[$field.name]} + "_MATCHES_SCALE", col(columnPrefix + ${columnVars[$field.name]}).rlike(("^[0-9]*(?:\\.[0-9]{0,${field.getValidation().getScale()}})?$"))) #end #if (${field.getValidation().getMinLength()}) - .withColumn(${columnVars[$field.name]} + "_GREATER_THAN_MAX_LENGTH", col(${columnVars[$field.name]}).rlike(("^.{${field.getValidation().getMinLength()},}"))) + .withColumn(${columnVars[$field.name]} + "_GREATER_THAN_MAX_LENGTH", col(columnPrefix + ${columnVars[$field.name]}).rlike(("^.{${field.getValidation().getMinLength()},}"))) #end #if (${field.getValidation().getMaxLength()}) - .withColumn(${columnVars[$field.name]} + "_LESS_THAN_MAX_LENGTH", col(${columnVars[$field.name]}).rlike(("^.{${field.getValidation().getMaxLength()},}")).equalTo(lit(false))) + .withColumn(${columnVars[$field.name]} + "_LESS_THAN_MAX_LENGTH", col(columnPrefix + ${columnVars[$field.name]}).rlike(("^.{${field.getValidation().getMaxLength()},}")).equalTo(lit(false))) #end #foreach ($format in $field.getValidation().getFormats()) #if ($foreach.first) - .withColumn(${columnVars[$field.name]} + "_MATCHES_FORMAT", col(${columnVars[$field.name]}).rlike(("$format.replace("\","\\")")) + .withColumn(${columnVars[$field.name]} + "_MATCHES_FORMAT", col(columnPrefix +${columnVars[$field.name]}).rlike(("$format.replace("\","\\")")) #else - .or(col(${columnVars[$field.name]}).rlike(("$format.replace("\","\\")"))) + .or(col(columnPrefix + ${columnVars[$field.name]}).rlike(("$format.replace("\","\\")"))) #end #if ($foreach.last) ) @@ -125,6 +170,15 @@ public abstract class ${record.capitalizedName}SchemaBase extends SparkSchema { #end #end ; + #foreach($relation in $record.relations) + #if($relation.isOneToManyRelation()) + ## TODO implement validation for records with one to Many relations + #else + ${relation.capitalizedName}Schema ${relation.uncapitalizeName}Schema = new ${relation.capitalizedName}Schema(); + dataWithValidations = dataWithValidations.withColumn(${relationVars[$relation.name]} + "_VALID", lit(!${relation.uncapitalizeName}Schema.validateDataFrame(data.select(col(${relationVars[$relation.name]})), ${relationVars[$relation.name]} + ".").isEmpty())); + #end + #end + Column filterSchema = null; List validationColumns = new ArrayList<>(); Collections.addAll(validationColumns, dataWithValidations.columns()); @@ -148,6 +202,7 @@ public abstract class ${record.capitalizedName}SchemaBase extends SparkSchema { validData = validData.drop(columnsToDrop); return validData; + #end } /** @@ -159,14 +214,34 @@ public abstract class ${record.capitalizedName}SchemaBase extends SparkSchema { return RowFactory.create( #foreach ($field in $record.fields) #if ($field.type.dictionaryType.isComplex()) - record.get${field.capitalizedName}() != null ? record.get${field.capitalizedName}().getValue() : null#if ($foreach.hasNext),#end + record.get${field.capitalizedName}() != null ? record.get${field.capitalizedName}().getValue() : null#if (($foreach.hasNext) || ($record.hasRelations())),#end + #else + record.get${field.capitalizedName}()#if (($foreach.hasNext) || ($record.hasRelations())),#end + #end + #end + #foreach ($relation in $record.relations) + #if ($relation.isOneToManyRelation()) + record.get${relation.capitalizedName}() != null ? ${relation.capitalizedName}Schema.asRows(record.get${relation.capitalizedName}()) : null#if ($foreach.hasNext),#end #else - record.get${field.capitalizedName}()#if ($foreach.hasNext),#end + record.get${relation.capitalizedName}() != null ? ${relation.capitalizedName}Schema.asRow(record.get${relation.capitalizedName}()) : null#if ($foreach.hasNext),#end #end #end ); } + /** + * Returns a list of given records as Spark dataset rows. + * + * @return the list of records as Spark dataset rows + */ + public static List asRows(List<${record.capitalizedName}> records) { + List rows = new ArrayList<>(); + for(${record.capitalizedName} ${record.name}: records){ + rows.add(asRow(${record.name})); + } + return rows; + } + /** * Returns a given Spark dataset row as a record. @@ -183,7 +258,28 @@ public abstract class ${record.capitalizedName}SchemaBase extends SparkSchema { record.set${field.capitalizedName}(${field.name}Value); #end #end + #foreach ($relation in $record.relations) + #if ($relation.isOneToManyRelation()) + Row[] ${relation.uncapitalizeName}Rows = (Row[])((WrappedArray) SparkDatasetUtils.getRowValue(row, "${relation.columnName}")).array(); + record.set${relation.capitalizedName}(${relation.capitalizedName}Schema.mapRows(Arrays.asList(${relation.uncapitalizeName}Rows))); + #else + ${relation.capitalizedName} ${relation.uncapitalizeName}Value = ${relation.capitalizedName}Schema.mapRow((Row) SparkDatasetUtils.getRowValue(row, "${relation.columnName}")); + record.set${relation.capitalizedName}(${relation.uncapitalizeName}Value); + #end + #end return record; } + /** + * Returns a given list of Spark dataset rows as a list of records. + * + * @return the row as a record + */ + public static List<${record.capitalizedName}> mapRows(List rows) { + List<${record.capitalizedName}> ${record.name} = new ArrayList<>(); + for(Row row : rows){ + ${record.name}.add(mapRow(row)); + } + return ${record.name}; + } } diff --git a/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/encryption.java.vm b/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/encryption.java.vm index df1a40bb9..770145cf0 100644 --- a/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/encryption.java.vm +++ b/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/encryption.java.vm @@ -111,4 +111,4 @@ ${step.encryptionSignature} { return datasetWithEncryptionPolicyApplied; #end #end -} \ No newline at end of file +} diff --git a/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/synchronous.processor.base.java.vm b/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/synchronous.processor.base.java.vm index 517ea13aa..22d2659d6 100644 --- a/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/synchronous.processor.base.java.vm +++ b/foundation/foundation-mda/src/main/resources/templates/data-delivery-spark/synchronous.processor.base.java.vm @@ -404,4 +404,4 @@ public abstract class ${step.capitalizedName}Base extends AbstractPipelineStep { this.$store.lowerName = fileStore; } #end -} \ No newline at end of file +} diff --git a/test/test-mda-models/test-data-delivery-spark-model/pom.xml b/test/test-mda-models/test-data-delivery-spark-model/pom.xml index 191b76e7b..80937bc62 100644 --- a/test/test-mda-models/test-data-delivery-spark-model/pom.xml +++ b/test/test-mda-models/test-data-delivery-spark-model/pom.xml @@ -34,10 +34,12 @@ test/java/com/boozallen/aiops/mda/pattern/RecordTest.java test/java/com/boozallen/aiops/mda/pattern/LineageTest.java test/java/com/boozallen/aiops/mda/pattern/RelationTest.java + test/java/com/boozallen/aiops/mda/pattern/SparkSchemaTest.java test/resources/config/ test/resources/specifications/record.feature test/resources/specifications/lineage.feature test/resources/specifications/relation.feature + test/resources/specifications/sparkSchema.feature test/resources/krausening/base/ test/resources/apps diff --git a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/dictionaries/SparkJavaDataDeliveryDictionary.json b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/dictionaries/SparkJavaDataDeliveryDictionary.json index 5685d10f3..bab14e772 100644 --- a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/dictionaries/SparkJavaDataDeliveryDictionary.json +++ b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/dictionaries/SparkJavaDataDeliveryDictionary.json @@ -84,6 +84,10 @@ { "name": "binarydata", "simpleType": "bytearray" + }, + { + "name": "string", + "simpleType": "string" } ] } diff --git a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Address.json b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Address.json index 3b0e33eb4..04a7a4a7e 100644 --- a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Address.json +++ b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Address.json @@ -5,6 +5,7 @@ "fields": [ { "name": "street", + "required": true, "type": { "name": "street", "package": "com.boozallen.aiops.mda.pattern.dictionary" @@ -32,4 +33,4 @@ } } ] -} \ No newline at end of file +} diff --git a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/City.json b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/City.json new file mode 100644 index 000000000..f911d5a80 --- /dev/null +++ b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/City.json @@ -0,0 +1,27 @@ +{ + "name": "City", + "package": "com.boozallen.aiops.mda.pattern.record", + "description": "Example City record for testing Spark Java Data Relations", + "relations": [ + { + "name": "Mayor", + "package": "com.boozallen.aiops.mda.pattern.records", + "multiplicity": "1-1", + "column": "MAYOR", + "required": true, + "documentation": "There is one mayor in the city" + }, + { + "name": "State", + "package": "com.boozallen.aiops.mda.pattern.records", + "multiplicity": "M-1", + "column": "STATE" + }, + { + "name": "Street", + "package": "com.boozallen.aiops.mda.pattern.records", + "multiplicity": "1-M", + "column": "STREET" + } + ] +} diff --git a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Mayor.json b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Mayor.json new file mode 100644 index 000000000..636622c44 --- /dev/null +++ b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Mayor.json @@ -0,0 +1,22 @@ +{ + "name": "Mayor", + "package": "com.boozallen.aiops.mda.pattern.record", + "description": "Example Mayor record for testing Spark Java Data Relations", + "fields": [ + { + "name": "name", + "type": { + "name": "string", + "package": "com.boozallen.aiops.mda.pattern.dictionary" + } + }, + { + "name": "integerValidation", + "type": { + "name": "integerWithValidation", + "package": "com.boozallen.aiops.mda.pattern.dictionary" + }, + "column": "int_v8n" + } + ] +} diff --git a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/PersonWithMToOneRelation.json b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/PersonWithMToOneRelation.json index a9e1e3701..12226ec4b 100644 --- a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/PersonWithMToOneRelation.json +++ b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/PersonWithMToOneRelation.json @@ -19,4 +19,4 @@ "documentation":"Many to One Relation between Person and Address" } ] -} \ No newline at end of file +} diff --git a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/State.json b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/State.json new file mode 100644 index 000000000..2a0fce643 --- /dev/null +++ b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/State.json @@ -0,0 +1,14 @@ +{ + "name": "State", + "package": "com.boozallen.aiops.mda.pattern.record", + "description": "Example State record for testing Spark Java Data Relations", + "fields": [ + { + "name": "name", + "type": { + "name": "string", + "package": "com.boozallen.aiops.mda.pattern.dictionary" + } + } + ] +} diff --git a/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Street.json b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Street.json new file mode 100644 index 000000000..a6e227368 --- /dev/null +++ b/test/test-mda-models/test-data-delivery-spark-model/src/main/resources/records/Street.json @@ -0,0 +1,29 @@ +{ + "name": "Street", + "package": "com.boozallen.aiops.mda.pattern.record", + "description": "Example Street record for testing Spark Java Data Relations", + "fields": [ + { + "name": "name", + "type": { + "name": "string", + "package": "com.boozallen.aiops.mda.pattern.dictionary" + } + }, + { + "name": "county", + "type": { + "name": "string", + "package": "com.boozallen.aiops.mda.pattern.dictionary" + } + }, + { + "name": "integerValidation", + "type": { + "name": "integerWithValidation", + "package": "com.boozallen.aiops.mda.pattern.dictionary" + }, + "column": "int_v8n" + } + ] +} diff --git a/test/test-mda-models/test-data-delivery-spark-model/src/test/java/com/boozallen/aiops/mda/pattern/SparkSchemaTest.java b/test/test-mda-models/test-data-delivery-spark-model/src/test/java/com/boozallen/aiops/mda/pattern/SparkSchemaTest.java new file mode 100644 index 000000000..f93925295 --- /dev/null +++ b/test/test-mda-models/test-data-delivery-spark-model/src/test/java/com/boozallen/aiops/mda/pattern/SparkSchemaTest.java @@ -0,0 +1,247 @@ +package com.boozallen.aiops.mda.pattern; + +/*- + * #%L + * aiSSEMBLE::Test::MDA::Data Delivery Spark + * %% + * Copyright (C) 2021 Booz Allen + * %% + * This software package is licensed under the Booz Allen Public License. All Rights Reserved. + * #L% + */ + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import com.boozallen.aiops.mda.pattern.dictionary.IntegerWithValidation; +import com.boozallen.aiops.mda.pattern.record.Address; +import com.boozallen.aiops.mda.pattern.record.City; +import com.boozallen.aiops.mda.pattern.record.CitySchema; +import com.boozallen.aiops.mda.pattern.record.Mayor; +import com.boozallen.aiops.mda.pattern.record.MayorSchema; +import com.boozallen.aiops.mda.pattern.record.PersonWithMToOneRelation; +import com.boozallen.aiops.mda.pattern.record.PersonWithMToOneRelationSchema; +import com.boozallen.aiops.mda.pattern.record.PersonWithOneToOneRelation; +import com.boozallen.aiops.mda.pattern.record.PersonWithOneToOneRelationSchema; +import com.boozallen.aiops.mda.pattern.record.State; +import com.boozallen.aiops.mda.pattern.record.Street; + +import io.cucumber.java.Before; +import io.cucumber.java.en.And; +import io.cucumber.java.en.Given; +import io.cucumber.java.en.Then; +import io.cucumber.java.en.When; + +public class SparkSchemaTest { + CitySchema citySchema; + PersonWithOneToOneRelationSchema personWithOneToOneRelationSchema; + PersonWithMToOneRelationSchema personWithMToOneRelationSchema; + SparkSession spark; + Dataset cityDataSet; + Dataset personWithOneToOneRelationDataSet; + Dataset personWithMToOneRelationDataSet; + Dataset validatedDataSet; + Exception exception; + + @Before("@SparkSchema") + public void setUp() { + this.spark = SparkTestHarness.getSparkSession(); + } + + @Given("the record \"City\" exists with the following relations") + public void theRecordExistsWithTheFollowingRelations(Map multiplicity) { + // Handled with MDA generation + } + + @Given("the spark schema is generate for the \"PersonWithOneToOneRelation\" record") + public void theSparkSchemaIsGenerateForThePersonWithOneToOneRelationRecord() { + this.personWithOneToOneRelationSchema = new PersonWithOneToOneRelationSchema(); + } + + @Given("a {string} \"PersonWithOneToOneRelation\" dataSet exists") + public void aValidPersonWithOneToOneRelationDataSetExists(String validity) { + PersonWithOneToOneRelation personWithOneToOneRelation = new PersonWithOneToOneRelation(); + if (StringUtils.equals("valid", validity)){ + personWithOneToOneRelation.setAddress(createAddress()); + } else { + Address address = createAddress(); + address.setStreet(null); + personWithOneToOneRelation.setAddress(address); + } + + List rows = Collections.singletonList(PersonWithOneToOneRelationSchema.asRow(personWithOneToOneRelation)); + this.personWithOneToOneRelationDataSet = spark.createDataFrame(rows, + this.personWithOneToOneRelationSchema.getStructType()); + } + + @Given("the spark schema is generate for the \"PersonWithMToOneRelation\" record") + public void theSparkSchemaIsGenerateForThePersonWithMToOneRelationRecord() { + this.personWithMToOneRelationSchema = new PersonWithMToOneRelationSchema(); + } + + @Given("a {string} \"PersonWithMToOneRelation\" dataSet exists") + public void aValidPersonWithManyToOneRelationDataSetExists(String validity) { + PersonWithMToOneRelation personWithOneToOneRelation = new PersonWithMToOneRelation(); + if (StringUtils.equals("valid", validity)){ + personWithOneToOneRelation.setAddress(createAddress()); + } else { + Address address = createAddress(); + address.setStreet(null); + personWithOneToOneRelation.setAddress(address); + } + + List rows = Collections.singletonList(PersonWithMToOneRelationSchema.asRow(personWithOneToOneRelation)); + this.personWithMToOneRelationDataSet = spark.createDataFrame(rows, + this.personWithMToOneRelationSchema.getStructType()); + } + + @Given("a valid \"City\" dataSet exists") + public void aValidDataSetExists() { + List rows = Collections.singletonList(CitySchema.asRow(createCity())); + this.cityDataSet = spark.createDataFrame(rows, this.citySchema.getStructType()); + } + + @Given("a \"City\" dataSet with an invalid relation exists") + public void aCityDataSetWithAnInvalidRelationExists() { + IntegerWithValidation integerWithValidation = new IntegerWithValidation(0); + Mayor mayor = new Mayor(); + mayor.setName("Sam"); + mayor.setIntegerValidation(integerWithValidation); + + City city = createCity(); + city.setMayor(mayor); + List rows = Collections.singletonList(CitySchema.asRow(city)); + this.cityDataSet = spark.createDataFrame(rows, this.citySchema.getStructType()); + } + + @When("the spark schema is generate for the \"City\" record") + public void theSparkSchemaIsGenerateForTheCityRecord() { + this.citySchema = new CitySchema(); + } + + @When("a \"City\" POJO is mapped to a spark dataset using the schema") + public void aSparkDatasetExists() { + City expectedCity = createCity(); + List cityRows = Collections.singletonList(CitySchema.asRow(expectedCity)); + + this.cityDataSet = this.spark.createDataFrame(cityRows, this.citySchema.getStructType()); + } + + @When("spark schema validation is performed on the dataSet") + public void sparkSchemaValidationIsPerformedOnTheDataSet() { + try { + this.validatedDataSet = this.citySchema.validateDataFrame(this.cityDataSet); + }catch (Exception e) { + this.exception = e; + } + } + + @When("spark schema validation is performed on the \"PersonWithOneToOneRelation\" dataSet") + public void sparkSchemaValidationIsPerformedOnThePersonWithOneToOneRelationDataSet() { + try { + this.validatedDataSet = this.personWithOneToOneRelationSchema.validateDataFrame(this.personWithOneToOneRelationDataSet); + }catch (Exception e) { + this.exception = e; + } + } + + @When("spark schema validation is performed on the \"PersonWithMToOneRelation\" dataSet") + public void sparkSchemaValidationIsPerformedOnThePersonWithMToOneRelationDataSet() { + try { + this.validatedDataSet = + this.personWithMToOneRelationSchema.validateDataFrame(this.personWithMToOneRelationDataSet); + }catch (Exception e) { + this.exception = e; + } + } + + @Then("the schema data type for {string} is {string}") + public void theSchemaDataTypeForIs(String record, String type) { + assertEquals("The type for record is not correct", type, + this.citySchema.getDataType(record.toUpperCase()).toString()); + } + + @Then("the dataset has the correct values for the relational objects") + public void aPOJOCanBeMappedToASparkRow() { + City expectedCity = createCity(); + for (Row row : this.cityDataSet.collectAsList()) { + City actualCity = CitySchema.mapRow(row); + assertEquals("City did not map correctly. Incorrect number of Street relations", + expectedCity.getStreet().size(), actualCity.getStreet().size()); + assertEquals("City did not map correctly. Incorrect Street relation", + expectedCity.getStreet().get(0).toJson(), actualCity.getStreet().get(0).toJson()); + assertEquals("City did not map correctly. Incorrect Mayor relation", expectedCity.getMayor().toJson(), + actualCity.getMayor().toJson()); + assertEquals("City did not map correctly. Incorrect State relation", expectedCity.getState().toJson(), + actualCity.getState().toJson()); + } + } + + @Then("the validation fails with NotYetImplementedException") + public void theValidationFailsWithNotYetImplementedException() { + assertNotNull("No exception was thrown", this.exception); + assertNotNull("Throw exception is not of instance NotImplementedException", this.exception instanceof + NotImplementedException ? (this.exception) : null); + } + + @Then("the dataSet validation {string}") + public void theDataSetValidationIsSuccessful(String succeed) { + if(StringUtils.equals("fails", succeed)) { + assertTrue("Validation passed when it should have failed", validatedDataSet.isEmpty()); + } else { + assertNotNull("Validation failed when it should have passed", validatedDataSet); + assertFalse("Validation failed when it should have passed", validatedDataSet.isEmpty()); + } + } + + private City createCity(){ + IntegerWithValidation integerWithValidation = new IntegerWithValidation(0); + + List streets = new ArrayList<>(); + Street street = new Street(); + street.setName("Street 1 Name"); + street.setCounty("County 2 Name"); + street.setIntegerValidation(integerWithValidation); + streets.add(street); + Street street2 = new Street(); + street2.setName("Street 2 Name"); + street2.setCounty("County 2 Name"); + street2.setIntegerValidation(integerWithValidation); + streets.add(street2); + + State state = new State(); + state.setName("Maryland"); + + Mayor mayor = new Mayor(); + mayor.setName("Sam"); + mayor.setIntegerValidation(integerWithValidation); + + City city = new City(); + city.setStreet(streets); + city.setMayor(mayor); + city.setState(state); + return city; + } + + private Address createAddress(){ + Address address = new Address(); + address.setZipcode(12); + address.setCity("City"); + address.setState("State"); + address.setStreet("Street"); + return address; + } +} diff --git a/test/test-mda-models/test-data-delivery-spark-model/src/test/resources/specifications/sparkSchema.feature b/test/test-mda-models/test-data-delivery-spark-model/src/test/resources/specifications/sparkSchema.feature new file mode 100644 index 000000000..e2771d07c --- /dev/null +++ b/test/test-mda-models/test-data-delivery-spark-model/src/test/resources/specifications/sparkSchema.feature @@ -0,0 +1,49 @@ +@SparkSchema +Feature: Records with relations are generated correctly and function as expected + + Background: + Given the record "City" exists with the following relations + | multiplicity | record | + | 1-1 | Mayor | + | 1-M | Street | + | M-1 | State | + + Scenario Outline: Spark schemas generated are able to get the correct data types + When the spark schema is generate for the "City" record + Then the schema data type for "" is "" + Examples: + | record | type | + | Mayor | StructType(StructField(name,StringType,true),StructField(int_v8n,IntegerType,true)) | + | Street | ArrayType(StructType(StructField(name,StringType,true),StructField(county,StringType,true),StructField(int_v8n,IntegerType,true)),true) | + | State | StructType(StructField(name,StringType,true)) | + + Scenario: Spark schemas generated has working to and from POJO functionality + Given the spark schema is generate for the "City" record + When a "City" POJO is mapped to a spark dataset using the schema + Then the dataset has the correct values for the relational objects + + Scenario: Spark schemas generated fails to validate with not yet implemented exception + Given the spark schema is generate for the "City" record + And a valid "City" dataSet exists + When spark schema validation is performed on the dataSet + Then the validation fails with NotYetImplementedException + + Scenario Outline: Records with a One to One relation can be validated using the spark schema + Given the spark schema is generate for the "PersonWithOneToOneRelation" record + And a "" "PersonWithOneToOneRelation" dataSet exists + When spark schema validation is performed on the "PersonWithOneToOneRelation" dataSet + Then the dataSet validation "" + Examples: + | validity | success | + | valid | passes | + | invalid | fails | + + Scenario Outline: Records with a Many to one relation can be validated using the spark schema + Given the spark schema is generate for the "PersonWithMToOneRelation" record + And a "" "PersonWithMToOneRelation" dataSet exists + When spark schema validation is performed on the "PersonWithMToOneRelation" dataSet + Then the dataSet validation "" + Examples: + | validity | success | + | valid | passes | + | invalid | fails |