diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index 6502e0e724..afb36f0443 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -9,10 +9,10 @@ repositories { } dependencies { - implementation("org.jetbrains.kotlin.jvm:org.jetbrains.kotlin.jvm.gradle.plugin:2.1.0") + implementation("org.jetbrains.kotlin.jvm:org.jetbrains.kotlin.jvm.gradle.plugin:2.1.10") implementation("com.diffplug.spotless:spotless-plugin-gradle:7.0.2") - implementation("io.freefair.gradle:lombok-plugin:8.11") + implementation("io.freefair.gradle:lombok-plugin:8.12") implementation("net.ltgt.gradle:gradle-errorprone-plugin:4.0.1") implementation("com.adarshr:gradle-test-logger-plugin:4.0.0") - implementation("org.pkl-lang:org.pkl-lang.gradle.plugin:0.27.1") + implementation("org.pkl-lang:org.pkl-lang.gradle.plugin:0.27.2") } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 47a08acf24..cbd01738aa 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,14 +1,14 @@ [versions] jackson = "2.18.2" slf4j = "2.0.16" -elasticsearch = "8.17.0" +elasticsearch = "8.17.1" freemarker = "2.3.34" classgraph = "4.8.179" testng = "7.10.2" log4j = "2.24.3" wiremock = "3.10.0" jnanoid = "2.0.0" -awssdk = "2.30.2" +awssdk = "2.30.7" gcs = "26.51.0" # was: "26.53.0" system-stubs = "2.1.7" fastcsv = "3.4.0" @@ -22,14 +22,14 @@ commons-io = "2.18.0" sqlite = "3.48.0.0" jakarta-mail = "2.1.3" angus-mail = "2.0.3" -pkl = "0.27.1" +pkl = "0.27.2" adls = "12.22.0" azure = "1.15.0" guava = "33.4.0-jre" openlineage = "1.27.0" -kotlin = "2.1.0" +kotlin = "2.1.10" kotlin-mu = "3.0.5" -rocksdb = "9.8.4" +rocksdb = "9.10.0" jetty = "12.0.16" netty = "4.1.117.Final" otel = "1.42.1" # was: "1.45.0" @@ -89,6 +89,6 @@ otel = [ "otel-sdk", "otel-exporter", "otel-autoconfig", "otel-appender" ] poi = [ "apache-poi", "apache-poi-ooxml" ] [plugins] -shadow = { id = "com.gradleup.shadow", version = "9.0.0-beta4" } +shadow = { id = "com.gradleup.shadow", version = "9.0.0-beta5" } git-publish = { id = "org.ajoberstar.git-publish", version = "4.2.2" } -pkl = { id = "org.pkl-lang", version = "0.27.1" } +pkl = { id = "org.pkl-lang", version = "0.27.2" } diff --git a/package-toolkit/config/src/main/resources/Renderers.pkl b/package-toolkit/config/src/main/resources/Renderers.pkl index dcee7bd2ef..2c685ac5bf 100644 --- a/package-toolkit/config/src/main/resources/Renderers.pkl +++ b/package-toolkit/config/src/main/resources/Renderers.pkl @@ -311,6 +311,7 @@ const function getLoggingConfPy(): FileOutput = new FileOutput { [formatter_jsonFormatter] format=%(asctime)s - %(name)s - %(levelname)s - %(message)s class=pyatlan.utils.JsonFormatter + """ } @@ -322,15 +323,18 @@ const function getRequirementsPy(): FileOutput = new FileOutput { opentelemetry-sdk==1.29.0 opentelemetry-instrumentation-logging==0.50b0 opentelemetry-exporter-otlp==1.29.0 + """ } /// Render the baseline requirements-dev.txt for a Python custom package. const function getRequirementsDevPy(): FileOutput = new FileOutput { text = """ + black pytest pytest-order nanoid + """ } @@ -355,6 +359,7 @@ const function getDockerfilePy(pkgName: String): FileOutput = new FileOutput { WORKDIR /app ENTRYPOINT ["/usr/local/bin/dumb-init", "--"] + """ } @@ -373,7 +378,7 @@ const function getConfigClassPy(m: Config): FileOutput = new FileOutput { validate_multiselect, validate_connector_and_connection, ) - from typing import Any, Optional, Union, Dict + from typing import Any, Optional, Union, Dict, List import logging.config PARENT = Path(__file__).parent @@ -472,6 +477,7 @@ const function getConfigClassPy(m: Config): FileOutput = new FileOutput { if field_value := getattr(self, key): ret_val[value["env"]] = field_value.json() return ret_val + """ }.join("\n") } diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVXformer.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVXformer.kt index fcd23322d3..6dc7608f84 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVXformer.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVXformer.kt @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicLong */ abstract class CSVXformer( private val inputFile: String, - private val targetHeader: Iterable?, + val targetHeader: Iterable?, private val logger: KLogger, private val fieldSeparator: Char = ',', ) : Closeable, @@ -138,6 +138,20 @@ abstract class CSVXformer( } } + /** + * Run the transformation and produce the output into the specified file. + * Note: when using this method, it is your responsibility to first output the header into the writer. + * (No header will ever be included via this method.) + * + * @param writer CSV writer into which the transformed CSV output will be written. + */ + fun transform(writer: CsvWriter) { + val start = System.currentTimeMillis() + logger.info { "Transforming $inputFile..." } + mapWithoutHeader(writer) + logger.info { "Total transformation time: ${System.currentTimeMillis() - start} ms" } + } + /** * Actually run the transformation. * @@ -146,6 +160,15 @@ abstract class CSVXformer( private fun map(writer: CsvWriter) { // Start by outputting the header row in the target CSV file writer.writeRecord(targetHeader) + mapWithoutHeader(writer) + } + + /** + * Actually run the transformation, not including any header. + * + * @param writer into which to write each transformed row of data + */ + private fun mapWithoutHeader(writer: CsvWriter) { // Calculate total number of rows that need to be transformed... val filteredRowCount = AtomicLong(0) counter.stream().skip(1).forEach { row -> diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/DeltaProcessor.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/DeltaProcessor.kt index cf8fa48841..92836eabaa 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/DeltaProcessor.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/DeltaProcessor.kt @@ -62,6 +62,14 @@ class DeltaProcessor( */ fun calculate() { if (semantic == "full") { + if (preprocessedDetails.multipleConnections) { + throw IllegalStateException( + """ + Assets in multiple connections detected in the input file. + Full delta processing currently only works for a single connection per input file, exiting. + """.trimIndent(), + ) + } if (qualifiedNamePrefix.isNullOrBlank()) { logger.warn { "Unable to determine qualifiedName prefix, cannot calculate any delta." } } else { @@ -219,12 +227,14 @@ class DeltaProcessor( * @param hasLinks whether there are any links in the input file * @param hasTermAssignments whether there are any term assignments in the input file * @param preprocessedFile full path to the preprocessed input file + * @param multipleConnections whether multiple connections were present in the input file (true) or only a single connection (false) */ open class Results( val assetRootName: String, hasLinks: Boolean, hasTermAssignments: Boolean, val preprocessedFile: String, + val multipleConnections: Boolean = false, ) : RowPreprocessor.Results( hasLinks = hasLinks, hasTermAssignments = hasTermAssignments, diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/FileBasedDelta.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/FileBasedDelta.kt index 1ab6892baa..a408775f06 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/FileBasedDelta.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/FileBasedDelta.kt @@ -271,7 +271,11 @@ class FileBasedDelta( val deletionType = if (purge) AtlanDeleteType.PURGE else AtlanDeleteType.SOFT val guidList = guidsToDeleteToDetails.entrySet() val totalToDelete = guidsToDeleteToDetails.size - logger.info { " --- Deleting ($deletionType) $totalToDelete assets across $removeTypes... ---" } + if (removeTypes.isNotEmpty()) { + logger.info { " --- Deleting ($deletionType) $totalToDelete assets (limited to types: $removeTypes)... ---" } + } else { + logger.info { " --- Deleting ($deletionType) $totalToDelete assets... ---" } + } val currentCount = AtomicLong(0) if (totalToDelete < DELETION_BATCH) { if (totalToDelete > 0) { diff --git a/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt b/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt index 98677f7d46..953bc70f6e 100644 --- a/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt +++ b/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt @@ -20,6 +20,11 @@ data class AssetImportCfg( @JsonProperty("assets_prefix") val assetsPrefix: String = "", @JsonProperty("assets_key") val assetsKey: String = "", @JsonProperty("assets_upsert_semantic") val assetsUpsertSemantic: String = "update", + @JsonProperty("assets_delta_semantic") val assetsDeltaSemantic: String = "delta", + @JsonProperty("assets_delta_removal_type") val assetsDeltaRemovalType: String = "archive", + @JsonProperty("assets_delta_reload_calculation") val assetsDeltaReloadCalculation: String = "all", + @JsonProperty("assets_previous_file_direct") val assetsPreviousFileDirect: String = "", + @JsonProperty("assets_previous_file_prefix") val assetsPreviousFilePrefix: String = "", @JsonProperty("assets_config") val assetsConfig: String? = null, @JsonDeserialize(using = WidgetSerde.MultiSelectDeserializer::class) @JsonSerialize(using = WidgetSerde.MultiSelectSerializer::class) diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt index 9f3f4e32f7..baa9d8cd72 100644 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt +++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt @@ -202,7 +202,13 @@ import com.atlan.pkg.serde.csv.CSVPreprocessor import com.atlan.pkg.serde.csv.CSVXformer import com.atlan.pkg.serde.csv.ImportResults import com.atlan.pkg.serde.csv.RowPreprocessor +import com.atlan.pkg.util.AssetResolver +import com.atlan.pkg.util.AssetResolver.QualifiedNameDetails +import com.atlan.pkg.util.DeltaProcessor +import com.atlan.util.AssetBatch.AssetIdentity +import com.atlan.util.StringUtils import mu.KLogger +import java.io.IOException /** * Import assets into Atlan from a provided CSV file. @@ -213,11 +219,13 @@ import mu.KLogger * asset in Atlan, then add that column's field to getAttributesToOverwrite. * * @param ctx context in which the package is running + * @param delta the processor containing any details about file deltas * @param filename name of the file to import * @param logger through which to write log entries */ class AssetImporter( ctx: PackageContext, + private val delta: DeltaProcessor?, filename: String, logger: KLogger, ) : CSVImporter( @@ -264,8 +272,7 @@ class AssetImporter( .stream() .filter { it.endDef1.type == it.endDef2.type } .forEach { cyclicalRelationships.getOrPut(it.endDef1.type) { mutableSetOf() }.add(RelationshipEnds(it.name, it.endDef1.name, it.endDef2.name)) } - val results = super.preprocess(outputFile, outputHeaders) - return results + return super.preprocess(outputFile, outputHeaders) } /** {@inheritDoc} */ @@ -391,12 +398,22 @@ class AssetImporter( typeIdx: Int, qnIdx: Int, ): Boolean { - return if (updateOnly) { - // If we are only updating, process in-parallel, in any order - row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }).isNotBlank() + val candidateRow = + if (updateOnly) { + // If we are only updating, process in-parallel, in any order + row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }).isNotBlank() + } else { + // If we are doing more than only updates, process the assets in top-down order + row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }) == typeToProcess + } + // Only proceed processing this candidate row if we're doing non-delta processing, or we have + // detected that it needs to be loaded via the delta processing + return if (candidateRow) { + delta?.resolveAsset(row, header)?.let { identity -> + delta.reloadAsset(identity) + } ?: true } else { - // If we are doing more than only updates, process the assets in top-down order - return row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }) == typeToProcess + false } } @@ -405,7 +422,8 @@ class AssetImporter( val types: List, ) - companion object { + companion object : AssetResolver { + const val NO_CONNECTION_QN = "NO_CONNECTION_FOUND" private val ordering = listOf( TypeGrouping( @@ -814,12 +832,42 @@ class AssetImporter( types.sortedBy { t -> ordering.flatMap { it.types }.indexOf(t).takeIf { it >= 0 } ?: Int.MAX_VALUE } + + /** {@inheritDoc} */ + override fun resolveAsset( + values: List, + header: List, + connectionsMap: Map, + ): AssetIdentity { + val typeIdx = header.indexOf(Asset.TYPE_NAME.atlanFieldName) + if (typeIdx < 0) { + throw IOException( + "Unable to find the column 'typeName'. This is a mandatory column in the input CSV.", + ) + } + val qnIdx = header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName) + if (qnIdx < 0) { + throw IOException( + "Unable to find the column 'qualifiedName'. This is a mandatory column in the input CSV.", + ) + } + val typeName = CSVXformer.trimWhitespace(values[typeIdx]) + val qualifiedName = CSVXformer.trimWhitespace(values[qnIdx]) + return AssetIdentity(typeName, qualifiedName) + } + + /** {@inheritDoc} */ + override fun getQualifiedNameDetails( + row: List, + header: List, + typeName: String, + ): QualifiedNameDetails = throw IllegalStateException("This method should never be called. Please raise an issue if you discover this in any log file.") } /** Pre-process the assets import file. */ private fun preprocess(): Results = Preprocessor(filename, fieldSeparator, logger).preprocess() - private class Preprocessor( + class Preprocessor( originalFile: String, fieldSeparator: Char, logger: KLogger, @@ -829,6 +877,7 @@ class AssetImporter( fieldSeparator = fieldSeparator, ) { private val typesInFile = mutableSetOf() + private var connectionQNs = mutableSetOf() /** {@inheritDoc} */ override fun preprocessRow( @@ -842,6 +891,16 @@ class AssetImporter( if (typeName.isNotBlank()) { typesInFile.add(row[typeIdx]) } + val qualifiedName = CSVXformer.trimWhitespace(row.getOrNull(header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName)) ?: "") + val connectionQNFromAsset = StringUtils.getConnectionQualifiedName(qualifiedName) + if (connectionQNFromAsset != null) { + connectionQNs.add(connectionQNFromAsset) + } else if (typeName == Connection.TYPE_NAME) { + // If the qualifiedName comes back as null and the asset itself is a connection, add it + connectionQNs.add(qualifiedName) + } else { + throw IllegalStateException("Found an asset without a valid qualifiedName (of type $typeName): $qualifiedName") + } return row } @@ -849,23 +908,31 @@ class AssetImporter( override fun finalize( header: List, outputFile: String?, - ): RowPreprocessor.Results { + ): DeltaProcessor.Results { val results = super.finalize(header, outputFile) return Results( + connectionQN = if (connectionQNs.isNotEmpty()) connectionQNs.first() else NO_CONNECTION_QN, + multipleConnections = connectionQNs.size > 1, hasLinks = results.hasLinks, hasTermAssignments = results.hasTermAssignments, + outputFile = outputFile ?: filename, typesInFile = typesInFile, ) } } - private class Results( + class Results( + connectionQN: String, + multipleConnections: Boolean, hasLinks: Boolean, hasTermAssignments: Boolean, + outputFile: String, val typesInFile: Set, - ) : RowPreprocessor.Results( + ) : DeltaProcessor.Results( + assetRootName = connectionQN, hasLinks = hasLinks, hasTermAssignments = hasTermAssignments, - outputFile = null, + multipleConnections = multipleConnections, + preprocessedFile = outputFile, ) } diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt index 09de7ce78b..a8c4e6d0f3 100644 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt +++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt @@ -7,6 +7,7 @@ import com.atlan.pkg.PackageContext import com.atlan.pkg.Utils import com.atlan.pkg.serde.FieldSerde import com.atlan.pkg.serde.csv.ImportResults +import com.atlan.pkg.util.DeltaProcessor import kotlin.system.exitProcess /** @@ -16,6 +17,8 @@ import kotlin.system.exitProcess object Importer { private val logger = Utils.getLogger(this.javaClass.name) + const val PREVIOUS_FILES_PREFIX = "csa-asset-import" + @JvmStatic fun main(args: Array) { val outputDirectory = if (args.isEmpty()) "tmp" else args[0] @@ -77,13 +80,50 @@ object Importer { ctx.config.assetsKey, ) FieldSerde.FAIL_ON_ERRORS.set(ctx.config.assetsFailOnErrors) - logger.info { "=== Importing assets... ===" } - val assetImporter = AssetImporter(ctx, assetsInput, logger) - val includes = assetImporter.preprocess() - if (includes.hasLinks) { + val previousFileDirect = ctx.config.assetsPreviousFileDirect + val preprocessedDetails = + AssetImporter + .Preprocessor(assetsInput, ctx.config.assetsFieldSeparator[0], logger) + .preprocess() + if (preprocessedDetails.hasLinks) { ctx.linkCache.preload() } - assetImporter.import() + DeltaProcessor( + ctx = ctx, + semantic = ctx.config.assetsDeltaSemantic, + qualifiedNamePrefix = preprocessedDetails.assetRootName, + removalType = ctx.config.assetsDeltaRemovalType, + previousFilesPrefix = ctx.config.assetsPreviousFilePrefix.ifBlank { PREVIOUS_FILES_PREFIX }, + resolver = AssetImporter, + preprocessedDetails = preprocessedDetails, + typesToRemove = emptyList(), + logger = logger, + reloadSemantic = ctx.config.assetsDeltaReloadCalculation, + previousFilePreprocessor = + AssetImporter.Preprocessor( + previousFileDirect, + ctx.config.assetsFieldSeparator[0], + logger, + ), + outputDirectory = outputDirectory, + ).use { delta -> + + delta.calculate() + + logger.info { "=== Importing assets... ===" } + val assetImporter = AssetImporter(ctx, delta, assetsInput, logger) + assetImporter.preprocess() // Note: we still do this to detect any cyclical relationships + val importedAssets = assetImporter.import() + + delta.processDeletions() + + // Note: we won't close the original set of changes here, as we'll combine it later for a full set of changes + // (at which point, it will be closed) + ImportResults.getAllModifiedAssets(ctx.client, false, importedAssets).use { modifiedAssets -> + delta.updateConnectionCache(modifiedAssets) + } + importedAssets + } } else { null } @@ -116,14 +156,6 @@ object Importer { } else { null } - - ImportResults.getAllModifiedAssets(ctx.client, false, resultsAssets).use { allModified -> - Utils.updateConnectionCache( - client = ctx.client, - added = allModified, - fallback = outputDirectory, - ) - } return ImportResults.combineAll(ctx.client, true, resultsGTC, resultsDDP, resultsAssets) } } diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/ProductImporter.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/ProductImporter.kt index 46f373b32f..1be6bdabc2 100644 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/ProductImporter.kt +++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/ProductImporter.kt @@ -68,10 +68,13 @@ class ProductImporter( override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> { val name = deserializer.getValue(DataProduct.NAME.atlanFieldName) as String val dataDomainMinimal = deserializer.getValue(DataProduct.DATA_DOMAIN.atlanFieldName)?.let { it as DataDomain } - val dataDomain = if (dataDomainMinimal != null) ctx.dataDomainCache.getByGuid(dataDomainMinimal.guid) as DataDomain else null + if (dataDomainMinimal == null) { + throw NoSuchElementException("No dataDomain provided for the data product, cannot be processed.") + } + val dataDomain = ctx.dataDomainCache.getByGuid(dataDomainMinimal.guid) ?: throw NoSuchElementException("dataDomain not found for the data product, cannot be processed: ${deserializer.getRawValue(DataProduct.DATA_DOMAIN.atlanFieldName)}") val dataProductAssetsDSL = deserializer.getValue(DataProduct.DATA_PRODUCT_ASSETS_DSL.atlanFieldName) as String? val qualifiedName = generateQualifiedName(deserializer, dataDomain) - val candidateDP = DataProduct.creator(name, dataDomain?.qualifiedName, dataProductAssetsDSL) + val candidateDP = DataProduct.creator(name, dataDomain.qualifiedName, dataProductAssetsDSL) return if (qualifiedName != getCacheId(deserializer, dataDomain)) { // If there is an existing qualifiedName, use it, otherwise we will get a conflict exception candidateDP.qualifiedName(qualifiedName) diff --git a/samples/packages/asset-import/src/main/resources/package.pkl b/samples/packages/asset-import/src/main/resources/package.pkl index c224ceaefa..b05aa3df91 100644 --- a/samples/packages/asset-import/src/main/resources/package.pkl +++ b/samples/packages/asset-import/src/main/resources/package.pkl @@ -92,6 +92,53 @@ uiConfig { helpText = "Whether to allow the creation of new assets from the input CSV (full or partial assets), or ensure assets are only updated if they already exist in Atlan." fallback = default } + ["assets_delta_semantic"] = new Radio { + title = "Delta handling" + required = false + possibleValues { + ["full"] = "Full replacement" + ["delta"] = "Incremental" + } + default = "delta" + helpText = "Whether to treat the input file as an initial load, full replacement (deleting any existing assets not in the file) or only incremental (no deletion of existing assets)." + fallback = default + } + ["assets_delta_removal_type"] = new Radio { + title = "Removal type" + required = false + possibleValues { + ["archive"] = "Archive (recoverable)" + ["purge"] = "Purge (cannot be recovered)" + } + default = "archive" + helpText = "How to delete any assets not found in the latest file." + fallback = default + } + ["assets_delta_reload_calculation"] = new Radio { + title = "Reload which assets" + required = false + possibleValues { + ["all"] = "All assets" + ["changes"] = "Changed assets only" + } + default = "all" + helpText = "Which assets to reload from the latest input CSV file. Changed assets only will calculate which assets have changed between the files and only attempt to reload those changes." + fallback = default + } + ["assets_previous_file_direct"] = new TextInput { + title = "Previous file" + required = false + hide = true + helpText = "Path to a direct file (locally) to use for delta processing." + fallback = "" + } + ["assets_previous_file_prefix"] = new TextInput { + title = "Previous files location" + required = false + hide = true + helpText = "Object store prefix in which previous files exist for delta processing." + fallback = "" + } ["assets_config"] = new Radio { title = "Options" required = true @@ -353,6 +400,10 @@ uiConfig { whenInputs { ["import_type"] = "CLOUD" } required { "cloud_source" "assets_prefix" "assets_key" "glossaries_prefix" "glossaries_key" "data_products_prefix" "data_products_key" } } + new UIRule { + whenInputs { ["assets_delta_semantic"] = "full" } + required { "assets_delta_removal_type" "assets_delta_reload_calculation" } + } new UIRule { whenInputs { ["assets_config"] = "advanced" } required { diff --git a/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/CreateThenUpDeltaAIMTest.kt b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/CreateThenUpDeltaAIMTest.kt new file mode 100644 index 0000000000..cb63bd0316 --- /dev/null +++ b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/CreateThenUpDeltaAIMTest.kt @@ -0,0 +1,419 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.aim + +import AssetImportCfg +import com.atlan.model.assets.Asset +import com.atlan.model.assets.Connection +import com.atlan.model.assets.Database +import com.atlan.model.assets.MaterializedView +import com.atlan.model.assets.Schema +import com.atlan.model.assets.Table +import com.atlan.model.assets.View +import com.atlan.model.enums.AtlanConnectorType +import com.atlan.model.enums.AtlanStatus +import com.atlan.model.fields.AtlanField +import com.atlan.pkg.PackageTest +import com.atlan.pkg.Utils +import com.atlan.pkg.cache.PersistentConnectionCache +import org.testng.Assert.assertFalse +import org.testng.Assert.assertTrue +import java.nio.file.Paths +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull + +/** + * Test creation of assets followed by an upsert of the same assets, including calculating a delta. + */ +class CreateThenUpDeltaAIMTest : PackageTest("ctuda") { + override val logger = Utils.getLogger(this.javaClass.name) + + private val conn1 = makeUnique("c1") + private val conn1Type = AtlanConnectorType.IBM_DB2 + private lateinit var connection: Connection + + private val testFile = "assets.csv" + private val revisedFile = "revised.csv" + + private val files = + listOf( + testFile, + "debug.log", + ) + + private fun prepFile(connectionQN: String = connection.qualifiedName) { + // Prepare a copy of the file with unique names for connections + val input = Paths.get("src", "test", "resources", testFile).toFile() + val output = Paths.get(testDirectory, testFile).toFile() + input.useLines { lines -> + lines.forEach { line -> + val revised = + line + .replace("iceberg", "ibmdb2") + .replace("{{CONNECTION}}", connectionQN) + output.appendText("$revised\n") + } + } + } + + private fun modifyFile(connectionQN: String = connection.qualifiedName) { + // Modify the loaded file to make some changes (testing upsert) + val input = Paths.get(testDirectory, testFile).toFile() + val output = Paths.get(testDirectory, revisedFile).toFile() + input.useLines { lines -> + lines.forEach { line -> + if (!line.contains("VIEW")) { + val revised = + line + .replace("A schema", "Revised schema description") + output.appendText("$revised\n") + } + } + } + // Create some net-new assets + output.appendText("$connectionQN/DB/SCH/VIEW2,View,VIEW2,ibmdb2,$connectionQN,$connectionQN/DB,DB,$connectionQN/DB/SCH,SCH,,,Schema@$connectionQN/DB/SCH\n") + output.appendText("$connectionQN/DB/SCH/MVIEW1,MaterialisedView,MVIEW1,ibmdb2,$connectionQN,$connectionQN/DB,DB,$connectionQN/DB/SCH,SCH,,,Schema@$connectionQN/DB/SCH\n") + } + + private val connectionAttrs: List = + listOf( + Connection.NAME, + Connection.CONNECTOR_TYPE, + Connection.ADMIN_ROLES, + Connection.ADMIN_GROUPS, + Connection.ADMIN_USERS, + ) + + private val databaseAttrs: List = + listOf( + Database.NAME, + Database.CONNECTION_QUALIFIED_NAME, + Database.CONNECTOR_TYPE, + Database.DESCRIPTION, + Database.SCHEMAS, + ) + + private val schemaAttrs: List = + listOf( + Schema.NAME, + Schema.CONNECTION_QUALIFIED_NAME, + Schema.CONNECTOR_TYPE, + Schema.DESCRIPTION, + Schema.DATABASE_NAME, + Schema.DATABASE_QUALIFIED_NAME, + Schema.TABLES, + Schema.VIEWS, + Schema.MATERIALIZED_VIEWS, + ) + + private val tableAttrs: List = + listOf( + Table.NAME, + Table.STATUS, + Table.CONNECTION_QUALIFIED_NAME, + Table.CONNECTOR_TYPE, + Table.DATABASE_NAME, + Table.DATABASE_QUALIFIED_NAME, + Table.SCHEMA_NAME, + Table.SCHEMA_QUALIFIED_NAME, + Table.SCHEMA, + ) + + private fun createConnection(): Connection { + val c1 = Connection.creator(client, conn1, conn1Type).build() + val response = c1.save(client).block() + return response.getResult(c1) + } + + override fun setup() { + connection = createConnection() + prepFile() + runCustomPackage( + AssetImportCfg( + assetsFile = Paths.get(testDirectory, testFile).toString(), + assetsUpsertSemantic = "upsert", + assetsFailOnErrors = true, + assetsDeltaSemantic = "full", + ), + Importer::main, + ) + } + + override fun teardown() { + removeConnection(conn1, conn1Type) + } + + @Test(groups = ["aim.ctud.create"]) + fun connection1Created() { + validateConnection() + } + + private fun validateConnection() { + val found = Connection.findByName(client, conn1, conn1Type, connectionAttrs) + assertNotNull(found) + assertEquals(1, found.size) + val c1 = found[0] + assertEquals(conn1, c1.name) + assertEquals(conn1Type, c1.connectorType) + } + + @Test(groups = ["aim.ctud.create"]) + fun database1Created() { + validateDatabase() + } + + private fun validateDatabase() { + val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!! + val request = + Database + .select(client) + .where(Database.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName)) + .includesOnResults(databaseAttrs) + .includeOnRelations(Schema.NAME) + .toRequest() + val response = retrySearchUntil(request, 1) + val found = response.assets + assertEquals(1, found.size) + val db = found[0] as Database + assertEquals("DB", db.name) + assertEquals(c1.qualifiedName, db.connectionQualifiedName) + assertEquals(conn1Type, db.connectorType) + assertEquals(1, db.schemas.size) + assertEquals("SCH", db.schemas.first().name) + } + + @Test(groups = ["aim.ctud.create"]) + fun schema1Created() { + validateSchema("A schema") + } + + private fun validateSchema(description: String) { + val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!! + val request = + Schema + .select(client) + .where(Schema.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName)) + .includesOnResults(schemaAttrs) + .includeOnRelations(Asset.NAME) + .toRequest() + val response = retrySearchUntil(request, 1) + val found = response.assets + assertEquals(1, found.size) + val sch = found[0] as Schema + assertEquals("SCH", sch.name) + assertEquals(description, sch.description) + assertEquals(c1.qualifiedName, sch.connectionQualifiedName) + assertEquals(conn1Type, sch.connectorType) + assertEquals("DB", sch.databaseName) + assertTrue(sch.databaseQualifiedName.endsWith("/DB")) + assertEquals(1, sch.tables.size) + assertEquals("TBL", sch.tables.first().name) + if (description == "Revised schema description") { + assertEquals(1, sch.views.size) + assertEquals("VIEW2", sch.views.first().name) + assertEquals(1, sch.materializedViews.size) + assertEquals("MVIEW1", sch.materializedViews.first().name) + } else { + assertEquals(1, sch.views.size) + assertEquals("VIEW", sch.views.first().name) + } + } + + @Test(groups = ["aim.ctud.create"]) + fun table1Created() { + validateTable() + } + + private fun validateTable() { + val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!! + val request = + Table + .select(client) + .where(Table.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName)) + .includesOnResults(tableAttrs) + .includeOnRelations(Schema.NAME) + .toRequest() + val response = retrySearchUntil(request, 1) + val found = response.assets + assertEquals(1, found.size) + val tbl = found[0] as Table + assertEquals("TBL", tbl.name) + assertEquals(c1.qualifiedName, tbl.connectionQualifiedName) + assertEquals(conn1Type, tbl.connectorType) + assertEquals("DB", tbl.databaseName) + assertTrue(tbl.databaseQualifiedName.endsWith("/DB")) + assertEquals("SCH", tbl.schemaName) + assertTrue(tbl.schemaQualifiedName.endsWith("/DB/SCH")) + assertEquals("SCH", tbl.schema.name) + } + + @Test(groups = ["aim.ctud.create"]) + fun view1Created() { + validateView() + } + + private fun validateView(exists: Boolean = true) { + val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!! + if (!exists) { + val request = + View + .select(client, true) + .where(View.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName)) + .where(View.STATUS.eq(AtlanStatus.DELETED)) + .includesOnResults(tableAttrs) + .toRequest() + val response = retrySearchUntil(request, 1, true) + val found = response.assets + assertEquals(1, found.size) + val view = found[0] as View + if (view.status != AtlanStatus.DELETED) { + logger.error { "Exact request: ${request.toJson(client)}" } + logger.error { "Exact response: ${response.rawJsonObject}" } + } + assertEquals(AtlanStatus.DELETED, view.status) + } else { + val request = + View + .select(client) + .where(View.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName)) + .includesOnResults(tableAttrs) + .includeOnRelations(Schema.NAME) + .toRequest() + val response = retrySearchUntil(request, 1) + val found = response.assets + assertEquals(1, found.size) + val view = found[0] as View + assertEquals("VIEW", view.name) + assertEquals(c1.qualifiedName, view.connectionQualifiedName) + assertEquals(conn1Type, view.connectorType) + assertEquals("SCH", view.schema.name) + } + } + + @Test(groups = ["aim.ctud.create"]) + fun connectionCacheCreated() { + validateConnectionCache() + } + + private fun validateConnectionCache(created: Boolean = true) { + val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!! + val dbFile = Paths.get(testDirectory, "connection-cache", "${c1.qualifiedName}.sqlite").toFile() + assertTrue(dbFile.isFile) + assertTrue(dbFile.exists()) + val cache = PersistentConnectionCache(dbFile.path) + val assets = cache.listAssets() + assertNotNull(assets) + assertFalse(assets.isEmpty()) + if (created) { + assertEquals(4, assets.size) + assertEquals(setOf(Database.TYPE_NAME, Schema.TYPE_NAME, Table.TYPE_NAME, View.TYPE_NAME), assets.map { it.typeName }.toSet()) + assertEquals(1, assets.count { it.typeName == Table.TYPE_NAME }) + assertEquals(1, assets.count { it.typeName == View.TYPE_NAME }) + assertEquals(setOf("VIEW"), assets.filter { it.typeName == View.TYPE_NAME }.map { it.name }.toSet()) + } else { + assertEquals(5, assets.size) + assertEquals(setOf(Database.TYPE_NAME, Schema.TYPE_NAME, Table.TYPE_NAME, View.TYPE_NAME, MaterializedView.TYPE_NAME), assets.map { it.typeName }.toSet()) + assertEquals(1, assets.count { it.typeName == Table.TYPE_NAME }) + assertEquals(1, assets.count { it.typeName == View.TYPE_NAME }) + assertEquals(1, assets.count { it.typeName == MaterializedView.TYPE_NAME }) + assertEquals(setOf("VIEW2"), assets.filter { it.typeName == View.TYPE_NAME }.map { it.name }.toSet()) + } + } + + @Test(groups = ["aim.ctud.runUpdate"], dependsOnGroups = ["aim.ctud.create"]) + fun upsertRevisions() { + modifyFile() + runCustomPackage( + AssetImportCfg( + assetsFile = Paths.get(testDirectory, revisedFile).toString(), + assetsUpsertSemantic = "upsert", + assetsFailOnErrors = true, + assetsDeltaSemantic = "full", + assetsDeltaReloadCalculation = "changes", + assetsPreviousFileDirect = Paths.get(testDirectory, testFile).toString(), + ), + Importer::main, + ) + // Allow Elastic index and deletion to become consistent + Thread.sleep(15000) + val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!! + val request = + MaterializedView + .select(client) + .where(MaterializedView.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName)) + .where(MaterializedView.NAME.eq("MVIEW1")) + .toRequest() + retrySearchUntil(request, 1) + } + + @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"]) + fun connectionUnchanged() { + validateConnection() + } + + @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"]) + fun databaseUnchanged() { + validateDatabase() + } + + @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"]) + fun schemaChanged() { + validateSchema("Revised schema description") + } + + @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"]) + fun tableUnchanged() { + validateTable() + } + + @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"]) + fun viewRemoved() { + validateView(false) + } + + @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"]) + fun entirelyNewView() { + val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!! + val request = + View + .select(client) + .where(View.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName)) + .includesOnResults(tableAttrs) + .includeOnRelations(Schema.NAME) + .toRequest() + val response = retrySearchUntil(request, 1) + val found = response.assets + assertEquals(1, found.size) + val view = found[0] as View + assertEquals("VIEW2", view.name) + assertEquals(c1.qualifiedName, view.connectionQualifiedName) + assertEquals(conn1Type, view.connectorType) + assertEquals("SCH", view.schema.name) + } + + @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"]) + fun connectionCacheUpdated() { + validateConnectionCache(false) + } + + @Test(dependsOnGroups = ["aim.ctud.*"]) + fun filesCreated() { + validateFilesExist(files) + } + + @Test(dependsOnGroups = ["aim.ctud.*"]) + fun previousRunFilesCreated() { + val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!! + val directory = Paths.get(testDirectory, Importer.PREVIOUS_FILES_PREFIX, c1.qualifiedName).toFile() + assertNotNull(directory) + assertTrue(directory.isDirectory) + val files = directory.walkTopDown().filter { it.isFile }.toList() + assertEquals(2, files.size) + } + + @Test(dependsOnGroups = ["aim.ctud.*"]) + fun errorFreeLog() { + validateErrorFreeLog() + } +} diff --git a/samples/packages/relational-assets-builder/src/test/kotlin/com/atlan/pkg/rab/DeltaTest.kt b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/DeltaTest.kt similarity index 72% rename from samples/packages/relational-assets-builder/src/test/kotlin/com/atlan/pkg/rab/DeltaTest.kt rename to samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/DeltaTest.kt index 4a4c356e4a..46b5941e23 100644 --- a/samples/packages/relational-assets-builder/src/test/kotlin/com/atlan/pkg/rab/DeltaTest.kt +++ b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/DeltaTest.kt @@ -1,10 +1,9 @@ /* SPDX-License-Identifier: Apache-2.0 Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.rab +package com.atlan.pkg.aim -import com.atlan.model.assets.Column -import com.atlan.model.assets.Connection -import com.atlan.model.assets.Table +import com.atlan.model.assets.MaterializedView +import com.atlan.model.assets.Schema import com.atlan.model.assets.View import com.atlan.model.enums.AtlanConnectorType import com.atlan.pkg.PackageTest @@ -20,7 +19,7 @@ import kotlin.test.assertEquals /** * Test pre-processing of full-load CSV files to detect which assets should be removed. */ -class DeltaTest : PackageTest("rd") { +class DeltaTest : PackageTest("aid") { override val logger = Utils.getLogger(this.javaClass.name) private val conn1 = makeUnique("c1") @@ -56,7 +55,7 @@ class DeltaTest : PackageTest("rd") { lines.forEach { line -> val revised = line - .replace("{{CONNECTION1}}", conn1) + .replace("{{CONNECTION}}", conn1QN) output.appendText("$revised\n") } } @@ -82,7 +81,7 @@ class DeltaTest : PackageTest("rd") { @Test fun totalAssetsToDelete() { - assertEquals(3, delta!!.assetsToDelete.size) + assertEquals(1, delta!!.assetsToDelete.size) val types = delta!! .assetsToDelete @@ -90,29 +89,22 @@ class DeltaTest : PackageTest("rd") { .map { it.key.typeName } .toList() .toSet() - assertEquals(2, types.size) + assertEquals(1, types.size) assertTrue(types.contains(View.TYPE_NAME)) - assertTrue(types.contains(Column.TYPE_NAME)) } @Test fun specificAssetsToDelete() { delta!!.assetsToDelete.entrySet().forEach { when (it.key.typeName) { - View.TYPE_NAME -> assertTrue("$conn1QN/TEST_DB/TEST_SCHEMA/TEST_VIEW" == it.key.qualifiedName) - Column.TYPE_NAME -> { - assertTrue( - "$conn1QN/TEST_DB/TEST_SCHEMA/TEST_VIEW/COL3" == it.key.qualifiedName || - "$conn1QN/TEST_DB/TEST_SCHEMA/TEST_VIEW/COL4" == it.key.qualifiedName, - ) - } + View.TYPE_NAME -> assertTrue("$conn1QN/DB/SCH/VIEW" == it.key.qualifiedName) } } } @Test fun totalAssetsToReload() { - assertEquals(3, delta!!.assetsToReload.size) + assertEquals(2, delta!!.assetsToReload.size) val types = delta!! .assetsToReload @@ -120,19 +112,17 @@ class DeltaTest : PackageTest("rd") { .map { it.key.typeName } .toList() .toSet() - assertEquals(3, types.size) - assertTrue(types.contains(Connection.TYPE_NAME)) - assertTrue(types.contains(Table.TYPE_NAME)) - assertTrue(types.contains(Column.TYPE_NAME)) + assertEquals(2, types.size) + assertTrue(types.contains(Schema.TYPE_NAME)) + assertTrue(types.contains(MaterializedView.TYPE_NAME)) } @Test fun specificAssetsToReload() { delta!!.assetsToReload.entrySet().forEach { when (it.key.typeName) { - Connection.TYPE_NAME -> assertEquals(conn1QN, it.key.qualifiedName) - Table.TYPE_NAME -> assertEquals("$conn1QN/TEST_DB/TEST_SCHEMA/TEST_TBL", it.key.qualifiedName) - Column.TYPE_NAME -> assertEquals("$conn1QN/TEST_DB/TEST_SCHEMA/TEST_TBL/COL2", it.key.qualifiedName) + Schema.TYPE_NAME -> assertEquals("$conn1QN/DB/SCH", it.key.qualifiedName) + MaterializedView.TYPE_NAME -> assertEquals("$conn1QN/DB/SCH/MVIEW", it.key.qualifiedName) } } } diff --git a/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/InvalidProductTest.kt b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/InvalidProductTest.kt new file mode 100644 index 0000000000..421935649d --- /dev/null +++ b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/InvalidProductTest.kt @@ -0,0 +1,66 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.aim + +import AssetImportCfg +import com.atlan.pkg.PackageTest +import com.atlan.pkg.Utils +import java.nio.file.Paths +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class InvalidProductTest : PackageTest("idr") { + override val logger = Utils.getLogger(this.javaClass.name) + + private val dataProduct1 = makeUnique("p1") + private val testFile = "product_without_domain.csv" + + private val files = + listOf( + testFile, + "debug.log", + ) + + private fun prepFile() { + // Prepare a copy of the file with unique names for domains and products + val input = Paths.get("src", "test", "resources", testFile).toFile() + val output = Paths.get(testDirectory, testFile).toFile() + input.useLines { lines -> + lines.forEach { line -> + val revised = + line + .replace("{{DATAPRODUCT1}}", dataProduct1) + output.appendText("$revised\n") + } + } + } + + override fun setup() { + prepFile() + } + + @Test + fun failsWithMeaningfulError() { + val exception = + assertFailsWith { + runCustomPackage( + AssetImportCfg( + dataProductsFile = Paths.get(testDirectory, testFile).toString(), + dataProductsUpsertSemantic = "upsert", + dataProductsFailOnErrors = true, + ), + Importer::main, + ) + } + assertEquals( + "No dataDomain provided for the data product, cannot be processed.", + exception.message, + ) + } + + @Test + fun filesCreated() { + validateFilesExist(files) + } +} diff --git a/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/MultiConnectionDeltaTest.kt b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/MultiConnectionDeltaTest.kt new file mode 100644 index 0000000000..a2f3d0458a --- /dev/null +++ b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/MultiConnectionDeltaTest.kt @@ -0,0 +1,82 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.aim + +import AssetImportCfg +import com.atlan.model.assets.Connection +import com.atlan.model.enums.AtlanConnectorType +import com.atlan.pkg.PackageTest +import com.atlan.pkg.Utils +import java.nio.file.Paths +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +/** + * Test import of an invalid tag value. + */ +class MultiConnectionDeltaTest : PackageTest("mcd") { + override val logger = Utils.getLogger(this.javaClass.name) + + private val table = makeUnique("t1") + + private val testFile = "multi-connection-delta.csv" + + private val files = + listOf( + testFile, + "debug.log", + ) + + private fun prepFile( + connectionQN1: String, + connectionQN2: String, + ) { + val input = Paths.get("src", "test", "resources", testFile).toFile() + val output = Paths.get(testDirectory, testFile).toFile() + input.useLines { lines -> + lines.forEach { line -> + val revised = + line + .replace("{{CONNECTION1}}", connectionQN1) + .replace("{{CONNECTION2}}", connectionQN2) + .replace("{{TABLE}}", table) + output.appendText("$revised\n") + } + } + } + + override fun setup() { + val snowflakeConnection = Connection.findByName(client, "production", AtlanConnectorType.SNOWFLAKE)?.get(0)!! + val mssqlConnection = Connection.findByName(client, "production", AtlanConnectorType.MSSQL)?.get(0)!! + prepFile(snowflakeConnection.qualifiedName, mssqlConnection.qualifiedName) + } + + @Test + fun failsWithMeaningfulError() { + val exception = + assertFailsWith { + runCustomPackage( + AssetImportCfg( + assetsFile = Paths.get(testDirectory, testFile).toString(), + assetsUpsertSemantic = "upsert", + assetsFailOnErrors = true, + assetsDeltaSemantic = "full", + ), + Importer::main, + ) + } + assertEquals( + """ + Assets in multiple connections detected in the input file. + Full delta processing currently only works for a single connection per input file, exiting. + """.trimIndent(), + exception.message, + ) + } + + @Test + fun filesCreated() { + validateFilesExist(files) + } +} diff --git a/samples/packages/asset-import/src/test/resources/assets.csv b/samples/packages/asset-import/src/test/resources/assets.csv new file mode 100644 index 0000000000..9af810e2d0 --- /dev/null +++ b/samples/packages/asset-import/src/test/resources/assets.csv @@ -0,0 +1,5 @@ +qualifiedName,typeName,name,connectorType,connectionQualifiedName,databaseQualifiedName,databaseName,schemaQualifiedName,schemaName,description,database,schema +{{CONNECTION}}/DB,Database,DB,iceberg,{{CONNECTION}},,,,,,, +{{CONNECTION}}/DB/SCH,Schema,SCH,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,,,A schema,Database@{{CONNECTION}}/DB, +{{CONNECTION}}/DB/SCH/TBL,Table,TBL,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,,,Schema@{{CONNECTION}}/DB/SCH +{{CONNECTION}}/DB/SCH/VIEW,View,VIEW,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,,,Schema@{{CONNECTION}}/DB/SCH diff --git a/samples/packages/asset-import/src/test/resources/assets_latest.csv b/samples/packages/asset-import/src/test/resources/assets_latest.csv new file mode 100644 index 0000000000..85638f5cfa --- /dev/null +++ b/samples/packages/asset-import/src/test/resources/assets_latest.csv @@ -0,0 +1,5 @@ +qualifiedName,typeName,name,connectorType,connectionQualifiedName,databaseQualifiedName,databaseName,schemaQualifiedName,schemaName,description,database,schema +{{CONNECTION}}/DB,Database,DB,iceberg,{{CONNECTION}},,,,,,, +{{CONNECTION}}/DB/SCH,Schema,SCH,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,,,A revised schema,Database@{{CONNECTION}}/DB, +{{CONNECTION}}/DB/SCH/TBL,Table,TBL,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,,,Schema@{{CONNECTION}}/DB/SCH +{{CONNECTION}}/DB/SCH/MVIEW,MaterialisedView,MVIEW,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,,,Schema@{{CONNECTION}}/DB/SCH diff --git a/samples/packages/asset-import/src/test/resources/multi-connection-delta.csv b/samples/packages/asset-import/src/test/resources/multi-connection-delta.csv new file mode 100644 index 0000000000..a4fec09832 --- /dev/null +++ b/samples/packages/asset-import/src/test/resources/multi-connection-delta.csv @@ -0,0 +1,3 @@ +qualifiedName,typeName,name,displayName,description,userDescription,ownerUsers,ownerGroups,certificateStatus,certificateStatusMessage,announcementType,announcementTitle,announcementMessage,assignedTerms,atlanTags,links,readme,starredDetails,connectorType,connectionQualifiedName,databaseQualifiedName,databaseName,schemaQualifiedName,schemaName +{{CONNECTION1}}/ANALYTICS/WIDE_WORLD_IMPORTERS/{{TABLE}},Table,{{TABLE}},,,,,,VERIFIED,,,,,,NON_EXISTENT_TAG,,,,snowflake,{{CONNECTION1}},{{CONNECTION1}}/ANALYTICS,ANALYTICS,{{CONNECTION1}}/ANALYTICS/WIDE_WORLD_IMPORTERS,WIDE_WORLD_IMPORTERS +{{CONNECTION2}}/ANALYTICS/WIDE_WORLD_IMPORTERS/{{TABLE}},Table,{{TABLE}},,,,,,VERIFIED,,,,,,NON_EXISTENT_TAG,,,,snowflake,{{CONNECTION2}},{{CONNECTION2}}/ANALYTICS,ANALYTICS,{{CONNECTION2}}/ANALYTICS/WIDE_WORLD_IMPORTERS,WIDE_WORLD_IMPORTERS diff --git a/samples/packages/asset-import/src/test/resources/product_without_domain.csv b/samples/packages/asset-import/src/test/resources/product_without_domain.csv new file mode 100644 index 0000000000..a200951395 --- /dev/null +++ b/samples/packages/asset-import/src/test/resources/product_without_domain.csv @@ -0,0 +1,2 @@ +qualifiedName,typeName,name,assetIcon,assetThemeHex,assetCoverImage,parentDomain,dataDomain,dataProductAssetsDSL,displayName,description,userDescription,ownerUsers,ownerGroups,certificateStatus,certificateStatusMessage,announcementType,announcementTitle,announcementMessage,readme +,DataProduct,{{DATAPRODUCT1}},,,,,,"""{\""query\"": {\""attributes\"": [\""__traitNames\"", \""connectorName\"", \""__customAttributes\"", \""certificateStatus\"", \""tenantId\"", \""anchor\"", \""parentQualifiedName\"", \""Query.parentQualifiedName\"", \""AtlasGlossaryTerm.anchor\"", \""databaseName\"", \""schemaName\"", \""parent\"", \""connectionQualifiedName\"", \""collectionQualifiedName\"", \""announcementMessage\"", \""announcementTitle\"", \""announcementType\"", \""announcementUpdatedAt\"", \""announcementUpdatedBy\"", \""allowQuery\"", \""allowQueryPreview\"", \""adminGroups\"", \""adminRoles\"", \""adminUsers\"", \""category\"", \""credentialStrategy\"", \""connectionSSOCredentialGuid\"", \""certificateStatus\"", \""certificateUpdatedAt\"", \""certificateUpdatedBy\"", \""classifications\"", \""connectionId\"", \""connectionQualifiedName\"", \""connectorName\"", \""dataType\"", \""defaultDatabaseQualifiedName\"", \""defaultSchemaQualifiedName\"", \""description\"", \""displayName\"", \""links\"", \""link\"", \""meanings\"", \""name\"", \""ownerGroups\"", \""ownerUsers\"", \""qualifiedName\"", \""typeName\"", \""userDescription\"", \""displayDescription\"", \""subDataType\"", \""rowLimit\"", \""queryTimeout\"", \""previewCredentialStrategy\"", \""policyStrategy\"", \""policyStrategyForSamplePreview\"", \""useObjectStorage\"", \""objectStorageUploadThreshold\"", \""outputPortDataProducts\""], \""dsl\"": {\""from\"": 0, \""query\"": {\""bool\"": {\""filter\"": {\""bool\"": {\""filter\"": [{\""term\"": {\""__state\"": {\""value\"": \""ACTIVE\"", \""case_insensitive\"": false}}}, {\""term\"": {\""__typeName.keyword\"": {\""value\"": \""AtlasGlossaryTerm\"", \""case_insensitive\"": false}}}]}}}}}, \""suppressLogs\"": true}, \""filterScrubbed\"": true}""",,,Test data product,chris,admins,,,,,,

This is Product!

diff --git a/samples/packages/relational-assets-builder/build.gradle.kts b/samples/packages/relational-assets-builder/build.gradle.kts index bcf8c2cbfb..b5f7b6aead 100644 --- a/samples/packages/relational-assets-builder/build.gradle.kts +++ b/samples/packages/relational-assets-builder/build.gradle.kts @@ -1,21 +1,41 @@ // SPDX-License-Identifier: Apache-2.0 version = providers.gradleProperty("VERSION_NAME").get() +val jarPath = "$rootDir/jars" val jarName = "relational-assets-builder" plugins { id("com.atlan.kotlin-custom-package") + alias(libs.plugins.shadow) `maven-publish` signing } +dependencies { + implementation(project(":samples:packages:asset-import")) +} + java { withSourcesJar() withJavadocJar() } tasks { - jar { + shadowJar { + isZip64 = true + archiveClassifier.set("") archiveBaseName.set(jarName) + destinationDirectory.set(file(jarPath)) + dependencies { + include(project(":samples:packages:asset-import")) + } + mergeServiceFiles() + dependsOn(":package-toolkit:runtime:genPklConnectors") + } + jar { + // Override the default jar task so we get the shadowed jar + // as the only jar output + actions = listOf() + doLast { shadowJar } } getByName("sourcesJar") { dependsOn("genCustomPkg") diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetImporter.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetImporter.kt deleted file mode 100644 index 48d33fd2ee..0000000000 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetImporter.kt +++ /dev/null @@ -1,149 +0,0 @@ -/* SPDX-License-Identifier: Apache-2.0 - Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.rab - -import RelationalAssetsBuilderCfg -import com.atlan.model.assets.Asset -import com.atlan.model.assets.Column -import com.atlan.model.assets.Connection -import com.atlan.model.assets.Database -import com.atlan.model.assets.ISQL -import com.atlan.model.assets.MaterializedView -import com.atlan.model.assets.Schema -import com.atlan.model.assets.Table -import com.atlan.model.assets.View -import com.atlan.model.enums.AssetCreationHandling -import com.atlan.pkg.PackageContext -import com.atlan.pkg.Utils -import com.atlan.pkg.serde.csv.CSVImporter -import com.atlan.pkg.serde.csv.CSVXformer -import com.atlan.pkg.serde.csv.ImportResults -import com.atlan.pkg.util.AssetResolver -import com.atlan.pkg.util.AssetResolver.ConnectionIdentity -import com.atlan.pkg.util.AssetResolver.QualifiedNameDetails -import com.atlan.pkg.util.DeltaProcessor -import mu.KLogger - -/** - * Import assets into Atlan from a provided CSV file. - * - * Only the assets and attributes in the provided CSV file will attempt to be loaded. - * By default, any blank values in a cell in the CSV file will be ignored. If you would like any - * particular column's blank values to actually overwrite (i.e. remove) existing values for that - * asset in Atlan, then add that column's field to getAttributesToOverwrite. - * - * @param ctx context through which this package is running - * @param delta the processor containing any details about file deltas - * @param filename name of the file to import - * @param typeNameFilter asset types to which to restrict loading - * @param logger through which to record logging - * @param creationHandling what to do with assets that do not exist (create full, partial, or ignore) - * @param batchSize maximum number of records to save per API request - * @param trackBatches if true, minimal details about every asset created or updated is tracked (if false, only counts of each are tracked) - */ -abstract class AssetImporter( - ctx: PackageContext, - private val delta: DeltaProcessor?, - filename: String, - typeNameFilter: String, - logger: KLogger, - creationHandling: AssetCreationHandling = Utils.getCreationHandling(ctx.config.assetsUpsertSemantic, AssetCreationHandling.FULL), - batchSize: Int = ctx.config.assetsBatchSize.toInt(), - trackBatches: Boolean = ctx.config.trackBatches, -) : CSVImporter( - ctx = ctx, - filename = filename, - logger = logger, - typeNameFilter = typeNameFilter, - attrsToOverwrite = attributesToClear(ctx.config.assetsAttrToOverwrite.toMutableList(), "assets", logger), - creationHandling = creationHandling, - batchSize = batchSize, - trackBatches = trackBatches, - fieldSeparator = ctx.config.assetsFieldSeparator[0], - failOnErrors = ctx.config.assetsFailOnErrors, - ) { - /** {@inheritDoc} */ - override fun import(columnsToSkip: Set): ImportResults? { - // Can skip all of these columns when deserializing a row as they will be set by - // the creator methods anyway - return super.import( - setOf( - Asset.CONNECTION_NAME.atlanFieldName, - // ConnectionImporter.CONNECTOR_TYPE, // Let this be loaded, for mis-named connections - ISQL.DATABASE_NAME.atlanFieldName, - ISQL.SCHEMA_NAME.atlanFieldName, - ENTITY_NAME, - ColumnImporter.COLUMN_PARENT_QN, - Column.ORDER.atlanFieldName, - ), - ) - } - - companion object : AssetResolver { - const val ENTITY_NAME = "entityName" - - /** {@inheritDoc} */ - override fun getQualifiedNameDetails( - row: List, - header: List, - typeName: String, - ): QualifiedNameDetails { - val parent: QualifiedNameDetails? - val current: String - when (typeName) { - Connection.TYPE_NAME -> { - val connection = CSVXformer.trimWhitespace(row[header.indexOf(Asset.CONNECTION_NAME.atlanFieldName)]) - val connector = CSVXformer.trimWhitespace(row[header.indexOf(ConnectionImporter.CONNECTOR_TYPE)]).lowercase() - current = ConnectionIdentity(connection, connector).toString() - parent = null - } - Database.TYPE_NAME -> { - current = CSVXformer.trimWhitespace(row[header.indexOf(ISQL.DATABASE_NAME.atlanFieldName)]) - parent = getQualifiedNameDetails(row, header, Connection.TYPE_NAME) - } - Schema.TYPE_NAME -> { - current = CSVXformer.trimWhitespace(row[header.indexOf(ISQL.SCHEMA_NAME.atlanFieldName)]) - parent = getQualifiedNameDetails(row, header, Database.TYPE_NAME) - } - Table.TYPE_NAME, View.TYPE_NAME, MaterializedView.TYPE_NAME -> { - current = CSVXformer.trimWhitespace(row[header.indexOf(ENTITY_NAME)]) - parent = getQualifiedNameDetails(row, header, Schema.TYPE_NAME) - } - Column.TYPE_NAME -> { - current = CSVXformer.trimWhitespace(row[header.indexOf(ColumnImporter.COLUMN_NAME)]) - parent = getQualifiedNameDetails(row, header, Table.TYPE_NAME) - } - else -> throw IllegalStateException("Unknown SQL type: $typeName") - } - val unique = - parent?.let { - if (parent.uniqueQN.isBlank()) current else "${parent.uniqueQN}/$current" - } ?: current - val partial = - parent?.let { - if (parent.partialQN.isBlank()) current else "${parent.partialQN}/$current" - } ?: "" - return QualifiedNameDetails( - unique, - partial, - parent?.uniqueQN ?: "", - parent?.partialQN ?: "", - ) - } - } - - /** {@inheritDoc} */ - override fun includeRow( - row: List, - header: List, - typeIdx: Int, - qnIdx: Int, - ): Boolean { - if (super.includeRow(row, header, typeIdx, qnIdx)) { - delta?.resolveAsset(row, header)?.let { identity -> - return delta.reloadAsset(identity) - } ?: return true - } - return false - } -} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetXformer.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetXformer.kt new file mode 100644 index 0000000000..7e1d5a568a --- /dev/null +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetXformer.kt @@ -0,0 +1,244 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.rab + +import RelationalAssetsBuilderCfg +import com.atlan.model.assets.Asset +import com.atlan.model.assets.Column +import com.atlan.model.assets.Connection +import com.atlan.model.assets.Database +import com.atlan.model.assets.ISQL +import com.atlan.model.assets.MaterializedView +import com.atlan.model.assets.Schema +import com.atlan.model.assets.Table +import com.atlan.model.assets.View +import com.atlan.pkg.PackageContext +import com.atlan.pkg.serde.RowSerde +import com.atlan.pkg.serde.csv.CSVXformer +import com.atlan.pkg.util.AssetResolver.ConnectionIdentity +import mu.KLogger + +abstract class AssetXformer( + private val ctx: PackageContext, + completeHeaders: List, + val typeNameFilter: String, + val preprocessedDetails: Importer.Results, + private val logger: KLogger, +) : CSVXformer( + inputFile = preprocessedDetails.preprocessedFile, + targetHeader = completeHeaders, + logger = logger, + fieldSeparator = ctx.config.assetsFieldSeparator[0], + ) { + /** {@inheritDoc} */ + override fun mapRow(inputRow: Map): List> { + val assetMap = mapAsset(inputRow) + val valueList = mutableListOf() + targetHeader!!.forEach { header -> + if (header != null) { + // Look for the transformed value first, then fallback to passing through what came in the input + val transformed = assetMap.getOrElse(header) { inputRow.getOrElse(header) { "" } } + valueList.add(transformed) + } + } + return listOf(valueList) + } + + /** {@inheritDoc} */ + override fun includeRow(inputRow: Map): Boolean = trimWhitespace(inputRow.getOrElse(Asset.TYPE_NAME.atlanFieldName) { "" }) == typeNameFilter + + abstract fun mapAsset(inputRow: Map): Map + + companion object { + const val ENTITY_NAME = "entityName" + + val BASE_OUTPUT_HEADERS = + listOf( + RowSerde.getHeaderForField(Asset.QUALIFIED_NAME), + RowSerde.getHeaderForField(Asset.TYPE_NAME), + RowSerde.getHeaderForField(Asset.NAME), + RowSerde.getHeaderForField(Asset.CONNECTOR_TYPE), + RowSerde.getHeaderForField(Asset.CONNECTION_QUALIFIED_NAME), + RowSerde.getHeaderForField(Database.SCHEMA_COUNT, Database::class.java), + RowSerde.getHeaderForField(Schema.DATABASE_NAME, Schema::class.java), + RowSerde.getHeaderForField(Schema.DATABASE_QUALIFIED_NAME, Schema::class.java), + RowSerde.getHeaderForField(Schema.DATABASE, Schema::class.java), + RowSerde.getHeaderForField(Schema.TABLE_COUNT, Schema::class.java), + RowSerde.getHeaderForField(Schema.VIEW_COUNT, Schema::class.java), + RowSerde.getHeaderForField(Table.SCHEMA_NAME, Table::class.java), + RowSerde.getHeaderForField(Table.SCHEMA_QUALIFIED_NAME, Table::class.java), + RowSerde.getHeaderForField(Table.SCHEMA, Table::class.java), + RowSerde.getHeaderForField(Table.COLUMN_COUNT, Table::class.java), + RowSerde.getHeaderForField(Column.TABLE_NAME, Column::class.java), + RowSerde.getHeaderForField(Column.TABLE_QUALIFIED_NAME, Column::class.java), + RowSerde.getHeaderForField(Column.TABLE, Column::class.java), + RowSerde.getHeaderForField(Column.VIEW_NAME, Column::class.java), + RowSerde.getHeaderForField(Column.VIEW_QUALIFIED_NAME, Column::class.java), + RowSerde.getHeaderForField(Column.VIEW, Column::class.java), + RowSerde.getHeaderForField(Column.MATERIALIZED_VIEW, Column::class.java), + RowSerde.getHeaderForField(Column.ORDER, Column::class.java), + RowSerde.getHeaderForField(Column.RAW_DATA_TYPE_DEFINITION, Column::class.java), + RowSerde.getHeaderForField(Column.PRECISION, Column::class.java), + RowSerde.getHeaderForField(Column.NUMERIC_SCALE, Column::class.java), + RowSerde.getHeaderForField(Column.MAX_LENGTH, Column::class.java), + ) + + fun getConnectionQN( + ctx: PackageContext, + inputRow: Map, + ): String { + val connectorType = trimWhitespace(inputRow.getOrElse("connectorType") { "" }) + val connectionName = trimWhitespace(inputRow.getOrElse(Asset.CONNECTION_NAME.atlanFieldName) { "" }) + val connectionId = ConnectionIdentity(connectionName, connectorType) + return ctx.connectionCache.getIdentityMap().getOrDefault(connectionId, "") + } + + fun getConnectorType(inputRow: Map): String = trimWhitespace(inputRow.getOrElse("connectorType") { "" }) + + /** + * Attempt to resolve the full SQL hierarchy details of a row (asset). + * Note: when the entityQualifiedNameToType is not passed, only very limited details can be resolved and returned, + * so use with caution unless you're able to provide the entityQualifiedNameToType map. + * + * @param row of data, representing a single asset + * @param typeName of that single row's asset + * @param entityQualifiedNameToType a map from unresolved (unique) qualifiedName of an asset to its type + */ + fun getSQLHierarchyDetails( + row: Map, + typeName: String, + entityQualifiedNameToType: Map? = null, + ): SQLHierarchyDetails { + val parent: SQLHierarchyDetails? + val current: String + var actualTypeName = typeName + when (typeName) { + Connection.TYPE_NAME -> { + val connection = trimWhitespace(row.getOrElse(Asset.CONNECTION_NAME.atlanFieldName) { "" }) + val connector = trimWhitespace(row.getOrElse(ConnectionImporter.CONNECTOR_TYPE) { "" }).lowercase() + current = ConnectionIdentity(connection, connector).toString() + parent = null + } + Database.TYPE_NAME -> { + current = trimWhitespace(row.getOrElse(ISQL.DATABASE_NAME.atlanFieldName) { "" }) + parent = getSQLHierarchyDetails(row, Connection.TYPE_NAME, entityQualifiedNameToType) + } + Schema.TYPE_NAME -> { + current = trimWhitespace(row.getOrElse(ISQL.SCHEMA_NAME.atlanFieldName) { "" }) + parent = getSQLHierarchyDetails(row, Database.TYPE_NAME, entityQualifiedNameToType) + } + "CONTAINER", Table.TYPE_NAME, View.TYPE_NAME, MaterializedView.TYPE_NAME -> { + current = trimWhitespace(row.getOrElse(ENTITY_NAME) { "" }) + parent = getSQLHierarchyDetails(row, Schema.TYPE_NAME, entityQualifiedNameToType) + // Only do this lookup if we have been passed a map -- otherwise this is detail that cannot + // yet be resolved (and will not yet be used, either) + if (entityQualifiedNameToType != null) { + actualTypeName = + entityQualifiedNameToType.getOrElse("${parent.uniqueQN}/$current") { + throw IllegalStateException("Could not find any table/view at: ${parent.uniqueQN}/$current") + } + } + } + Column.TYPE_NAME -> { + current = trimWhitespace(row.getOrElse(ColumnXformer.COLUMN_NAME) { "" }) + parent = getSQLHierarchyDetails(row, "CONTAINER", entityQualifiedNameToType) + } + else -> throw IllegalStateException("Unknown SQL type: $typeName") + } + val unique = + parent?.let { + if (parent.uniqueQN.isBlank()) current else "${parent.uniqueQN}/$current" + } ?: current + val partial = + parent?.let { + if (parent.partialQN.isBlank()) current else "${parent.partialQN}/$current" + } ?: "" + var databaseName = "" + var databasePQN = "" + var schemaName = "" + var schemaPQN = "" + var tableName = "" + var tablePQN = "" + var viewName = "" + var viewPQN = "" + when (actualTypeName) { + Schema.TYPE_NAME -> { + databaseName = parent?.name ?: "" + databasePQN = parent?.partialQN ?: "" + } + Table.TYPE_NAME -> { + databaseName = parent?.databaseName ?: "" + databasePQN = parent?.databasePQN ?: "" + schemaName = parent?.name ?: "" + schemaPQN = parent?.partialQN ?: "" + } + View.TYPE_NAME -> { + databaseName = parent?.databaseName ?: "" + databasePQN = parent?.databasePQN ?: "" + schemaName = parent?.name ?: "" + schemaPQN = parent?.partialQN ?: "" + } + MaterializedView.TYPE_NAME -> { + databaseName = parent?.databaseName ?: "" + databasePQN = parent?.databasePQN ?: "" + schemaName = parent?.name ?: "" + schemaPQN = parent?.partialQN ?: "" + } + Column.TYPE_NAME -> { + databaseName = parent?.databaseName ?: "" + databasePQN = parent?.databasePQN ?: "" + schemaName = parent?.schemaName ?: "" + schemaPQN = parent?.schemaPQN ?: "" + val parentType = parent?.typeName ?: "" + when (parentType) { + Table.TYPE_NAME -> { + tableName = parent?.name ?: "" + tablePQN = parent?.partialQN ?: "" + } + View.TYPE_NAME, MaterializedView.TYPE_NAME -> { + viewName = parent?.name ?: "" + viewPQN = parent?.partialQN ?: "" + } + } + } + } + return SQLHierarchyDetails( + current, + partial, + unique, + actualTypeName, + parent?.name ?: "", + parent?.partialQN ?: "", + parent?.uniqueQN ?: "", + parent?.typeName ?: "", + databaseName, + databasePQN, + schemaName, + schemaPQN, + tableName, + tablePQN, + viewName, + viewPQN, + ) + } + } + + data class SQLHierarchyDetails( + val name: String, + val partialQN: String, + val uniqueQN: String, + val typeName: String, + val parentName: String, + val parentPartialQN: String, + val parentUniqueQN: String, + val parentTypeName: String, + val databaseName: String, + val databasePQN: String, + val schemaName: String, + val schemaPQN: String, + val tableName: String, + val tablePQN: String, + val viewName: String, + val viewPQN: String, + ) +} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ColumnImporter.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ColumnImporter.kt deleted file mode 100644 index c5e19d4674..0000000000 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ColumnImporter.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* SPDX-License-Identifier: Apache-2.0 - Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.rab - -import RelationalAssetsBuilderCfg -import com.atlan.model.assets.Asset -import com.atlan.model.assets.Column -import com.atlan.pkg.PackageContext -import com.atlan.pkg.serde.RowDeserializer -import com.atlan.pkg.serde.cell.DataTypeXformer -import com.atlan.pkg.util.DeltaProcessor -import mu.KLogger - -/** - * Import columns into Atlan from a provided CSV file. - * - * Only the columns and attributes in the provided CSV file will attempt to be loaded. - * By default, any blank values in a cell in the CSV file will be ignored. If you would like any - * particular column's blank values to actually overwrite (i.e. remove) existing values for that - * asset in Atlan, then add that column's field to getAttributesToOverwrite. - * - * @param ctx context through which this package is running - * @param delta the processor containing any details about file deltas - * @param preprocessed details of the preprocessed CSV file - * @param connectionImporter that was used to import connections - * @param logger through which to record logging - */ -class ColumnImporter( - ctx: PackageContext, - private val delta: DeltaProcessor, - private val preprocessed: Importer.Results, - private val connectionImporter: ConnectionImporter, - logger: KLogger, -) : AssetImporter( - ctx = ctx, - delta = delta, - filename = preprocessed.preprocessedFile, - typeNameFilter = Column.TYPE_NAME, - logger = logger, - ) { - companion object { - const val COLUMN_PARENT_QN = "columnParentQualifiedName" - const val COLUMN_NAME = "columnName" - } - - /** {@inheritDoc} */ - override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> { - val name = deserializer.getValue(COLUMN_NAME)?.let { it as String } ?: "" - val order = deserializer.getValue(Column.ORDER.atlanFieldName)?.let { it as Int } ?: 0 - val qnDetails = getQualifiedNameDetails(deserializer.row, deserializer.heading, typeNameFilter) - val connectionQN = connectionImporter.getBuilder(deserializer).build().qualifiedName - val parentQN = "$connectionQN/${qnDetails.parentPartialQN}" - val parentType = preprocessed.entityQualifiedNameToType[qnDetails.parentUniqueQN] ?: throw IllegalStateException("Could not find any table/view at: ${qnDetails.parentUniqueQN}") - val builder = Column.creator(name, parentType, parentQN, order) - val rawDataType = deserializer.getRawValue(Column.DATA_TYPE.atlanFieldName) - if (rawDataType.isNotBlank()) { - builder.rawDataTypeDefinition(rawDataType) - if (!rawDataType.contains("<") && !rawDataType.contains(">")) { - // Only attempt to parse things like precision, scale and max-length if this is not a complex type - DataTypeXformer.getPrecision(rawDataType)?.let { builder.precision(it) } - DataTypeXformer.getScale(rawDataType)?.let { builder.numericScale(it) } - DataTypeXformer.getMaxLength(rawDataType)?.let { builder.maxLength(it) } - } - } - return builder - } -} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ColumnXformer.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ColumnXformer.kt new file mode 100644 index 0000000000..69c0bced27 --- /dev/null +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ColumnXformer.kt @@ -0,0 +1,78 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.rab + +import RelationalAssetsBuilderCfg +import com.atlan.model.assets.Asset +import com.atlan.model.assets.Column +import com.atlan.model.assets.MaterializedView +import com.atlan.model.assets.Table +import com.atlan.model.assets.View +import com.atlan.pkg.PackageContext +import com.atlan.pkg.serde.RowSerde +import com.atlan.pkg.serde.cell.DataTypeXformer +import mu.KLogger + +class ColumnXformer( + private val ctx: PackageContext, + completeHeaders: List, + preprocessedDetails: Importer.Results, + private val logger: KLogger, +) : AssetXformer( + ctx = ctx, + completeHeaders = completeHeaders, + typeNameFilter = Column.TYPE_NAME, + preprocessedDetails = preprocessedDetails, + logger = logger, + ) { + companion object { + const val COLUMN_PARENT_QN = "columnParentQualifiedName" + const val COLUMN_NAME = "columnName" + } + + override fun mapAsset(inputRow: Map): Map { + val connectionQN = getConnectionQN(ctx, inputRow) + val details = getSQLHierarchyDetails(inputRow, typeNameFilter, preprocessedDetails.entityQualifiedNameToType) + val assetQN = "$connectionQN/${details.partialQN}" + val parentQN = "$connectionQN/${details.parentPartialQN}" + val rawDataType = trimWhitespace(inputRow.getOrElse(Column.DATA_TYPE.atlanFieldName) { "" }) + var precision: Int? = null + var scale: Double? = null + var maxLength: Long? = null + if (rawDataType.isNotBlank()) { + if (!rawDataType.contains("<") && !rawDataType.contains(">")) { + // Only attempt to parse things like precision, scale and max-length if this is not a complex type + precision = DataTypeXformer.getPrecision(rawDataType) + scale = DataTypeXformer.getScale(rawDataType) + maxLength = DataTypeXformer.getMaxLength(rawDataType) + } + } + return if (assetQN.isNotBlank()) { + return mapOf( + RowSerde.getHeaderForField(Asset.QUALIFIED_NAME) to assetQN, + RowSerde.getHeaderForField(Asset.TYPE_NAME) to typeNameFilter, + RowSerde.getHeaderForField(Asset.NAME) to details.name, + RowSerde.getHeaderForField(Asset.CONNECTOR_TYPE) to getConnectorType(inputRow), + RowSerde.getHeaderForField(Asset.CONNECTION_QUALIFIED_NAME) to connectionQN, + RowSerde.getHeaderForField(Column.DATABASE_NAME, Column::class.java) to details.databaseName, + RowSerde.getHeaderForField(Column.DATABASE_QUALIFIED_NAME, Column::class.java) to "$connectionQN/${details.databasePQN}", + RowSerde.getHeaderForField(Column.SCHEMA_NAME, Column::class.java) to details.schemaName, + RowSerde.getHeaderForField(Column.SCHEMA_QUALIFIED_NAME, Column::class.java) to "$connectionQN/${details.schemaPQN}", + RowSerde.getHeaderForField(Column.TABLE_NAME, Column::class.java) to details.tableName, + RowSerde.getHeaderForField(Column.TABLE_QUALIFIED_NAME, Column::class.java) to if (details.tablePQN.isNotBlank()) "$connectionQN/${details.tablePQN}" else "", + RowSerde.getHeaderForField(Column.TABLE, Column::class.java) to if (details.parentTypeName == Table.TYPE_NAME) "${details.parentTypeName}@$parentQN" else "", + RowSerde.getHeaderForField(Column.VIEW_NAME, Column::class.java) to details.viewName, + RowSerde.getHeaderForField(Column.VIEW_QUALIFIED_NAME, Column::class.java) to if (details.viewPQN.isNotBlank()) "$connectionQN/${details.viewPQN}" else "", + RowSerde.getHeaderForField(Column.VIEW, Column::class.java) to if (details.parentTypeName == View.TYPE_NAME) "${details.parentTypeName}@$parentQN" else "", + RowSerde.getHeaderForField(Column.MATERIALIZED_VIEW, Column::class.java) to if (details.parentTypeName == MaterializedView.TYPE_NAME) "${details.parentTypeName}@$parentQN" else "", + RowSerde.getHeaderForField(Column.ORDER, Column::class.java) to inputRow.getOrElse(Column.ORDER.atlanFieldName) { "" }, + RowSerde.getHeaderForField(Column.RAW_DATA_TYPE_DEFINITION, Column::class.java) to rawDataType, + RowSerde.getHeaderForField(Column.PRECISION, Column::class.java) to (precision?.toString() ?: ""), + RowSerde.getHeaderForField(Column.NUMERIC_SCALE, Column::class.java) to (scale?.toString() ?: ""), + RowSerde.getHeaderForField(Column.MAX_LENGTH, Column::class.java) to (maxLength?.toString() ?: ""), + ) + } else { + mapOf() + } + } +} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ConnectionImporter.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ConnectionImporter.kt index ca3b6be9d5..7d18e6ce69 100644 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ConnectionImporter.kt +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ConnectionImporter.kt @@ -4,11 +4,15 @@ package com.atlan.pkg.rab import RelationalAssetsBuilderCfg import com.atlan.model.assets.Asset +import com.atlan.model.assets.Column import com.atlan.model.assets.Connection +import com.atlan.model.assets.ISQL import com.atlan.model.enums.AssetCreationHandling import com.atlan.model.enums.AtlanConnectorType import com.atlan.pkg.PackageContext import com.atlan.pkg.serde.RowDeserializer +import com.atlan.pkg.serde.csv.CSVImporter +import com.atlan.pkg.serde.csv.ImportResults import mu.KLogger import java.util.stream.Stream @@ -21,29 +25,45 @@ import java.util.stream.Stream * asset in Atlan, then add that column's field to getAttributesToOverwrite. * * @param ctx context through which this package is running - * @param preprocessed details of the preprocessed CSV file + * @param inputFile the input file containing connection details * @param logger through which to record logging */ class ConnectionImporter( ctx: PackageContext, - private val preprocessed: Importer.Results, + private val inputFile: String, logger: KLogger, -) : AssetImporter( +) : CSVImporter( ctx = ctx, - delta = null, - filename = preprocessed.preprocessedFile, - // Only allow full or updates to connections, as partial connections would be hidden - // and impossible to delete via utilities like the Connection Delete workflow - typeNameFilter = Connection.TYPE_NAME, + filename = inputFile, logger = logger, + typeNameFilter = Connection.TYPE_NAME, + attrsToOverwrite = attributesToClear(ctx.config.assetsAttrToOverwrite.toMutableList(), "assets", logger), creationHandling = if (ctx.config.assetsUpsertSemantic == "update") AssetCreationHandling.NONE else AssetCreationHandling.FULL, batchSize = 1, trackBatches = true, + fieldSeparator = ctx.config.assetsFieldSeparator[0], + failOnErrors = ctx.config.assetsFailOnErrors, ) { companion object { const val CONNECTOR_TYPE = "connectorType" } + /** {@inheritDoc} */ + override fun import(columnsToSkip: Set): ImportResults? { + // Can skip all of these columns when deserializing a row as they will be set by + // the creator methods anyway + return super.import( + setOf( + Asset.CONNECTION_NAME.atlanFieldName, + ISQL.DATABASE_NAME.atlanFieldName, + ISQL.SCHEMA_NAME.atlanFieldName, + AssetXformer.ENTITY_NAME, + ColumnXformer.COLUMN_PARENT_QN, + Column.ORDER.atlanFieldName, + ), + ) + } + /** {@inheritDoc} */ @Suppress("UNCHECKED_CAST") override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> { diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ContainerXformer.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ContainerXformer.kt new file mode 100644 index 0000000000..944ac67b55 --- /dev/null +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ContainerXformer.kt @@ -0,0 +1,49 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.rab + +import RelationalAssetsBuilderCfg +import com.atlan.model.assets.Asset +import com.atlan.model.assets.Table +import com.atlan.pkg.PackageContext +import com.atlan.pkg.serde.RowSerde +import mu.KLogger + +abstract class ContainerXformer( + private val ctx: PackageContext, + completeHeaders: List, + typeNameFilter: String, + preprocessedDetails: Importer.Results, + private val logger: KLogger, +) : AssetXformer( + ctx = ctx, + completeHeaders = completeHeaders, + typeNameFilter = typeNameFilter, + preprocessedDetails = preprocessedDetails, + logger = logger, + ) { + override fun mapAsset(inputRow: Map): Map { + val connectionQN = getConnectionQN(ctx, inputRow) + val details = getSQLHierarchyDetails(inputRow, typeNameFilter, preprocessedDetails.entityQualifiedNameToType) + val assetQN = "$connectionQN/${details.partialQN}" + val parentQN = "$connectionQN/${details.parentPartialQN}" + val columnCount = preprocessedDetails.qualifiedNameToChildCount[details.uniqueQN]?.toLong() + return if (assetQN.isNotBlank()) { + return mapOf( + RowSerde.getHeaderForField(Asset.QUALIFIED_NAME) to assetQN, + RowSerde.getHeaderForField(Asset.TYPE_NAME) to typeNameFilter, + RowSerde.getHeaderForField(Asset.NAME) to details.name, + RowSerde.getHeaderForField(Asset.CONNECTOR_TYPE) to getConnectorType(inputRow), + RowSerde.getHeaderForField(Asset.CONNECTION_QUALIFIED_NAME) to connectionQN, + RowSerde.getHeaderForField(Table.DATABASE_NAME, Table::class.java) to details.databaseName, + RowSerde.getHeaderForField(Table.DATABASE_QUALIFIED_NAME, Table::class.java) to "$connectionQN/${details.databasePQN}", + RowSerde.getHeaderForField(Table.SCHEMA_NAME, Table::class.java) to details.parentName, + RowSerde.getHeaderForField(Table.SCHEMA_QUALIFIED_NAME, Table::class.java) to parentQN, + RowSerde.getHeaderForField(Table.SCHEMA, Table::class.java) to "Schema@$parentQN", + RowSerde.getHeaderForField(Table.COLUMN_COUNT, Table::class.java) to (columnCount?.toString() ?: ""), + ) + } else { + mapOf() + } + } +} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/DatabaseImporter.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/DatabaseImporter.kt deleted file mode 100644 index ba035ec790..0000000000 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/DatabaseImporter.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* SPDX-License-Identifier: Apache-2.0 - Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.rab - -import RelationalAssetsBuilderCfg -import com.atlan.model.assets.Asset -import com.atlan.model.assets.Database -import com.atlan.pkg.PackageContext -import com.atlan.pkg.serde.RowDeserializer -import com.atlan.pkg.util.DeltaProcessor -import mu.KLogger - -/** - * Import databases into Atlan from a provided CSV file. - * - * Only the databases and attributes in the provided CSV file will attempt to be loaded. - * By default, any blank values in a cell in the CSV file will be ignored. If you would like any - * particular column's blank values to actually overwrite (i.e. remove) existing values for that - * asset in Atlan, then add that column's field to getAttributesToOverwrite. - * - * @param ctx context through which this package is running - * @param delta the processor containing any details about file deltas - * @param preprocessed details of the preprocessed CSV file - * @param connectionImporter that was used to import connections - * @param logger through which to record logging - */ -class DatabaseImporter( - ctx: PackageContext, - private val delta: DeltaProcessor, - private val preprocessed: Importer.Results, - private val connectionImporter: ConnectionImporter, - logger: KLogger, -) : AssetImporter( - ctx = ctx, - delta = delta, - filename = preprocessed.preprocessedFile, - typeNameFilter = Database.TYPE_NAME, - logger = logger, - ) { - /** {@inheritDoc} */ - override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> { - val name = deserializer.getValue(Database.DATABASE_NAME.atlanFieldName)?.let { it as String } ?: "" - val connectionQN = connectionImporter.getBuilder(deserializer).build().qualifiedName - val qnDetails = getQualifiedNameDetails(deserializer.row, deserializer.heading, typeNameFilter) - return Database - .creator(name, connectionQN) - .schemaCount(preprocessed.qualifiedNameToChildCount[qnDetails.uniqueQN]?.toInt()) - } -} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/DatabaseXformer.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/DatabaseXformer.kt new file mode 100644 index 0000000000..ed334b890c --- /dev/null +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/DatabaseXformer.kt @@ -0,0 +1,42 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.rab + +import RelationalAssetsBuilderCfg +import com.atlan.model.assets.Asset +import com.atlan.model.assets.Database +import com.atlan.pkg.PackageContext +import com.atlan.pkg.serde.RowSerde +import mu.KLogger + +class DatabaseXformer( + private val ctx: PackageContext, + completeHeaders: List, + preprocessedDetails: Importer.Results, + private val logger: KLogger, +) : AssetXformer( + ctx = ctx, + completeHeaders = completeHeaders, + typeNameFilter = Database.TYPE_NAME, + preprocessedDetails = preprocessedDetails, + logger = logger, + ) { + override fun mapAsset(inputRow: Map): Map { + val connectionQN = getConnectionQN(ctx, inputRow) + val details = getSQLHierarchyDetails(inputRow, typeNameFilter, preprocessedDetails.entityQualifiedNameToType) + val assetQN = "$connectionQN/${details.partialQN}" + val schemaCount = preprocessedDetails.qualifiedNameToChildCount[details.uniqueQN]?.toInt() + return if (assetQN.isNotBlank()) { + return mapOf( + RowSerde.getHeaderForField(Asset.QUALIFIED_NAME) to assetQN, + RowSerde.getHeaderForField(Asset.TYPE_NAME) to typeNameFilter, + RowSerde.getHeaderForField(Asset.NAME) to details.name, + RowSerde.getHeaderForField(Asset.CONNECTOR_TYPE) to getConnectorType(inputRow), + RowSerde.getHeaderForField(Asset.CONNECTION_QUALIFIED_NAME) to connectionQN, + RowSerde.getHeaderForField(Database.SCHEMA_COUNT, Database::class.java) to (schemaCount?.toString() ?: ""), + ) + } else { + mapOf() + } + } +} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt index 923dd2bfb9..91cded1407 100644 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt @@ -2,9 +2,8 @@ Copyright 2023 Atlan Pte. Ltd. */ package com.atlan.pkg.rab +import AssetImportCfg import RelationalAssetsBuilderCfg -import com.atlan.exception.AtlanException -import com.atlan.model.assets.Asset import com.atlan.model.assets.Column import com.atlan.model.assets.Connection import com.atlan.model.assets.Database @@ -12,16 +11,20 @@ import com.atlan.model.assets.MaterializedView import com.atlan.model.assets.Schema import com.atlan.model.assets.Table import com.atlan.model.assets.View -import com.atlan.model.enums.AtlanConnectorType import com.atlan.pkg.PackageContext import com.atlan.pkg.Utils -import com.atlan.pkg.rab.AssetImporter.Companion.getQualifiedNameDetails +import com.atlan.pkg.rab.AssetXformer.Companion.BASE_OUTPUT_HEADERS +import com.atlan.pkg.rab.AssetXformer.Companion.getSQLHierarchyDetails import com.atlan.pkg.serde.FieldSerde import com.atlan.pkg.serde.csv.CSVPreprocessor import com.atlan.pkg.serde.csv.CSVXformer -import com.atlan.pkg.serde.csv.ImportResults +import com.atlan.pkg.serde.csv.CSVXformer.Companion.getHeader import com.atlan.pkg.util.DeltaProcessor +import de.siegmar.fastcsv.writer.CsvWriter +import de.siegmar.fastcsv.writer.LineDelimiter +import de.siegmar.fastcsv.writer.QuoteStrategies import java.nio.file.Paths +import java.nio.file.StandardOpenOption import java.util.concurrent.atomic.AtomicInteger import kotlin.system.exitProcess @@ -47,12 +50,11 @@ object Importer { * * @param ctx context in which this package is running * @param outputDirectory in which to do any data processing - * @return the qualifiedName of the connection that was delta-processed, or null if no delta-processing enabled */ fun import( ctx: PackageContext, outputDirectory: String = "tmp", - ): String? { + ) { val assetsUpload = ctx.config.importType == "DIRECT" val assetsFilename = ctx.config.assetsFile val assetsKey = ctx.config.assetsKey @@ -80,26 +82,6 @@ object Importer { ctx.config.assetsPrefix, assetsKey, ) - val targetHeaders = CSVXformer.getHeader(assetsInput, fieldSeparator).toMutableList() - // Inject two columns at the end that we need for column assets - targetHeaders.add(Column.ORDER.atlanFieldName) - targetHeaders.add(ColumnImporter.COLUMN_PARENT_QN) - val revisedFile = Paths.get("$assetsInput.CSA_RAB.csv") - val preprocessedDetails = - Preprocessor(assetsInput, fieldSeparator, ctx.config.deltaSemantic == "full") - .preprocess( - outputFile = revisedFile.toString(), - outputHeaders = targetHeaders, - ) - - // Only cache links and terms if there are any in the CSV, otherwise this - // will be unnecessary work - if (preprocessedDetails.hasLinks) { - ctx.linkCache.preload() - } - if (preprocessedDetails.hasTermAssignments) { - ctx.termCache.preload() - } ctx.connectionCache.preload() @@ -111,86 +93,127 @@ object Importer { // (without tracking, any connections created will NOT be cached, either, which will then cause issues // with the subsequent processing steps.) // We also need to load these connections first, irrespective of any delta calculation, so that - // we can be certain we will be able to resolve the cube's qualifiedName (for subsequent processing) - val connectionImporter = ConnectionImporter(ctx, preprocessedDetails, logger) + // we can be certain we will be able to resolve the assets' qualifiedNames (for subsequent processing) + val connectionImporter = ConnectionImporter(ctx, assetsInput, logger) connectionImporter.import()?.close() - val connectionQN = - if (ctx.config.deltaSemantic == "full") { - val connectionIdentity = ConnectionIdentity.fromString(preprocessedDetails.assetRootName) - try { - val list = Connection.findByName(ctx.client, connectionIdentity.name, AtlanConnectorType.fromValue(connectionIdentity.type)) - list[0].qualifiedName - } catch (e: AtlanException) { - logger.error(e) { "Unable to find the unique connection in Atlan from the file: $connectionIdentity" } - exitProcess(50) - } + val transformedFile = transform(ctx, fieldSeparator, assetsInput) + + val previousFileXformed = + if (ctx.config.previousFileDirect.isNotBlank()) { + transform(ctx, fieldSeparator, ctx.config.previousFileDirect) } else { - null + "" } - val previousFileDirect = ctx.config.previousFileDirect - DeltaProcessor( - ctx = ctx, - semantic = ctx.config.deltaSemantic, - qualifiedNamePrefix = connectionQN, - removalType = ctx.config.deltaRemovalType, - previousFilesPrefix = PREVIOUS_FILES_PREFIX, - resolver = AssetImporter, - preprocessedDetails = preprocessedDetails, - typesToRemove = listOf(Database.TYPE_NAME, Schema.TYPE_NAME, Table.TYPE_NAME, View.TYPE_NAME, MaterializedView.TYPE_NAME, Column.TYPE_NAME), - logger = logger, - reloadSemantic = ctx.config.deltaReloadCalculation, - previousFilePreprocessor = - Preprocessor( - previousFileDirect, - fieldSeparator, - true, - outputFile = "$previousFileDirect.transformed.csv", + val importConfig = + AssetImportCfg( + assetsFile = transformedFile, + assetsUpsertSemantic = ctx.config.assetsUpsertSemantic, + assetsDeltaSemantic = ctx.config.deltaSemantic, + assetsDeltaRemovalType = ctx.config.deltaRemovalType, + assetsDeltaReloadCalculation = ctx.config.deltaReloadCalculation, + assetsPreviousFileDirect = previousFileXformed, + assetsPreviousFilePrefix = PREVIOUS_FILES_PREFIX, + assetsAttrToOverwrite = ctx.config.assetsAttrToOverwrite, + assetsFailOnErrors = ctx.config.assetsFailOnErrors, + assetsFieldSeparator = ctx.config.assetsFieldSeparator, + assetsBatchSize = ctx.config.assetsBatchSize, + trackBatches = ctx.config.trackBatches, + ) + Utils.initializeContext(importConfig, ctx).use { iCtx -> + com.atlan.pkg.aim.Importer + .import(iCtx, outputDirectory) + ?.close() + } + } + + private fun transform( + ctx: PackageContext, + fieldSeparator: Char, + inputFile: String, + ): String { + val targetHeaders = getHeader(inputFile, fieldSeparator).toMutableList() + // Inject two columns at the end that we need for column assets + targetHeaders.add(Column.ORDER.atlanFieldName) + targetHeaders.add(ColumnXformer.COLUMN_PARENT_QN) + val revisedFile = Paths.get("$inputFile.CSA_RAB.csv") + val preprocessedDetails = + Preprocessor(inputFile, fieldSeparator) + .preprocess( + outputFile = revisedFile.toString(), outputHeaders = targetHeaders, - ), - outputDirectory = outputDirectory, - ).use { delta -> + ) - delta.calculate() + // Only cache links and terms if there are any in the CSV, otherwise this + // will be unnecessary work + if (preprocessedDetails.hasLinks) { + ctx.linkCache.preload() + } + if (preprocessedDetails.hasTermAssignments) { + ctx.termCache.preload() + } - logger.info { " --- Importing databases... ---" } - val databaseImporter = DatabaseImporter(ctx, delta, preprocessedDetails, connectionImporter, logger) - val dbResults = databaseImporter.import() + val completeHeaders = BASE_OUTPUT_HEADERS.toMutableList() + val transformedFile = "$inputFile.transformed.csv" + // Determine any non-standard RAB fields in the header and append them to the end of + // the list of standard header fields, so they're passed-through to asset import + val inputHeaders = getHeader(preprocessedDetails.preprocessedFile, fieldSeparator = fieldSeparator).toMutableList() + inputHeaders.removeAll(BASE_OUTPUT_HEADERS) + inputHeaders.removeAll( + listOf( + ColumnXformer.COLUMN_NAME, + ColumnXformer.COLUMN_PARENT_QN, + ConnectionImporter.CONNECTOR_TYPE, + AssetXformer.ENTITY_NAME, + ), + ) + inputHeaders.forEach { completeHeaders.add(it) } - logger.info { " --- Importing schemas... ---" } - val schemaImporter = SchemaImporter(ctx, delta, preprocessedDetails, connectionImporter, logger) - val schResults = schemaImporter.import() + CsvWriter + .builder() + .fieldSeparator(fieldSeparator) + .quoteCharacter('"') + .quoteStrategy(QuoteStrategies.NON_EMPTY) + .lineDelimiter(LineDelimiter.PLATFORM) + .build( + Paths.get(transformedFile), + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.WRITE, + ).use { writer -> + writer.writeRecord(completeHeaders) - logger.info { " --- Importing tables... ---" } - val tableImporter = TableImporter(ctx, delta, preprocessedDetails, connectionImporter, logger) - val tblResults = tableImporter.import() + logger.info { " --- Transforming databases... ---" } + val databaseXformer = DatabaseXformer(ctx, completeHeaders, preprocessedDetails, logger) + databaseXformer.transform(writer) - logger.info { " --- Importing views... ---" } - val viewImporter = ViewImporter(ctx, delta, preprocessedDetails, connectionImporter, logger) - val viewResults = viewImporter.import() + logger.info { " --- Transforming schemas... ---" } + val schemaXformer = SchemaXformer(ctx, completeHeaders, preprocessedDetails, logger) + schemaXformer.transform(writer) - logger.info { " --- Importing materialized views... ---" } - val materializedViewImporter = MaterializedViewImporter(ctx, delta, preprocessedDetails, connectionImporter, logger) - val mviewResults = materializedViewImporter.import() + logger.info { " --- Transforming tables... ---" } + val tableXformer = TableXformer(ctx, completeHeaders, preprocessedDetails, logger) + tableXformer.transform(writer) - logger.info { " --- Importing columns... ---" } - val columnImporter = ColumnImporter(ctx, delta, preprocessedDetails, connectionImporter, logger) - val colResults = columnImporter.import() + logger.info { " --- Transforming views... ---" } + val viewXformer = ViewXformer(ctx, completeHeaders, preprocessedDetails, logger) + viewXformer.transform(writer) - delta.processDeletions() + logger.info { " --- Transforming materialized views... ---" } + val materializedViewXformer = MaterializedViewXformer(ctx, completeHeaders, preprocessedDetails, logger) + materializedViewXformer.transform(writer) - ImportResults.getAllModifiedAssets(ctx.client, true, dbResults, schResults, tblResults, viewResults, mviewResults, colResults).use { modifiedAssets -> - delta.updateConnectionCache(modifiedAssets) + logger.info { " --- Transforming columns... ---" } + val columnXformer = ColumnXformer(ctx, completeHeaders, preprocessedDetails, logger) + columnXformer.transform(writer) } - } - return connectionQN + return transformedFile } private class Preprocessor( originalFile: String, fieldSeparator: Char, - val deltaProcessing: Boolean, outputFile: String? = null, outputHeaders: List? = null, ) : CSVPreprocessor( @@ -205,7 +228,6 @@ object Importer { val qualifiedNameToTableCount = mutableMapOf() val qualifiedNameToViewCount = mutableMapOf() - var connectionIdentity = "" var lastParentQN = "" var columnOrder = 1 @@ -215,23 +237,9 @@ object Importer { typeIdx: Int, qnIdx: Int, ): List { - if (deltaProcessing) { - val connectionNameOnRow = row.getOrNull(header.indexOf(Asset.CONNECTION_NAME.atlanFieldName)) ?: "" - val connectionTypeOnRow = row.getOrNull(header.indexOf("connectorType")) ?: "" - val connectionIdentityOnRow = ConnectionIdentity(connectionTypeOnRow, connectionNameOnRow).toString() - if (connectionIdentity.isBlank()) { - connectionIdentity = connectionIdentityOnRow - } - if (connectionIdentity != connectionIdentityOnRow) { - logger.error { "Connection changed mid-file: $connectionIdentityOnRow -> $connectionIdentityOnRow" } - logger.error { "This package is designed to only process a single connection per input file when doing delta processing, exiting." } - exitProcess(101) - } - } - val values = row.toMutableList() val typeName = CSVXformer.trimWhitespace(values.getOrElse(typeIdx) { "" }) - val qnDetails = getQualifiedNameDetails(values, header, typeName) + val qnDetails = getSQLHierarchyDetails(CSVXformer.getRowByHeader(header, values), typeName) if (typeName !in setOf(Table.TYPE_NAME, View.TYPE_NAME, MaterializedView.TYPE_NAME)) { if (!qualifiedNameToChildCount.containsKey(qnDetails.parentUniqueQN)) { qualifiedNameToChildCount[qnDetails.parentUniqueQN] = AtomicInteger(0) @@ -282,7 +290,6 @@ object Importer { ): Results { val results = super.finalize(header, outputFile) return Results( - connectionIdentity, results.hasLinks, results.hasTermAssignments, results.outputFile!!, @@ -295,7 +302,6 @@ object Importer { } class Results( - assetRootName: String, hasLinks: Boolean, hasTermAssignments: Boolean, preprocessedFile: String, @@ -304,25 +310,9 @@ object Importer { val qualifiedNameToTableCount: Map, val qualifiedNameToViewCount: Map, ) : DeltaProcessor.Results( - assetRootName = assetRootName, + assetRootName = "", hasLinks = hasLinks, hasTermAssignments = hasTermAssignments, preprocessedFile = preprocessedFile, ) - - data class ConnectionIdentity( - val type: String, - val name: String, - ) { - override fun toString(): String = "$type$DELIMITER$name" - - companion object { - const val DELIMITER = "::" - - fun fromString(identity: String): ConnectionIdentity { - val tokens = identity.split(DELIMITER) - return ConnectionIdentity(tokens[0], tokens[1]) - } - } - } } diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/MaterializedViewImporter.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/MaterializedViewImporter.kt deleted file mode 100644 index 12f963ceeb..0000000000 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/MaterializedViewImporter.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* SPDX-License-Identifier: Apache-2.0 - Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.rab - -import RelationalAssetsBuilderCfg -import com.atlan.model.assets.Asset -import com.atlan.model.assets.MaterializedView -import com.atlan.pkg.PackageContext -import com.atlan.pkg.serde.RowDeserializer -import com.atlan.pkg.util.DeltaProcessor -import mu.KLogger - -/** - * Import materialized views into Atlan from a provided CSV file. - * - * Only the materialized views and attributes in the provided CSV file will attempt to be loaded. - * By default, any blank values in a cell in the CSV file will be ignored. If you would like any - * particular column's blank values to actually overwrite (i.e. remove) existing values for that - * asset in Atlan, then add that column's field to getAttributesToOverwrite. - * - * @param ctx context through which this package is running - * @param delta the processor containing any details about file deltas - * @param preprocessed details of the preprocessed CSV file - * @param connectionImporter that was used to import connections - * @param logger through which to record logging - */ -class MaterializedViewImporter( - ctx: PackageContext, - private val delta: DeltaProcessor, - private val preprocessed: Importer.Results, - private val connectionImporter: ConnectionImporter, - logger: KLogger, -) : AssetImporter( - ctx = ctx, - delta = delta, - filename = preprocessed.preprocessedFile, - typeNameFilter = MaterializedView.TYPE_NAME, - logger = logger, - ) { - /** {@inheritDoc} */ - override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> { - val name = deserializer.getValue(ENTITY_NAME)?.let { it as String } ?: "" - val connectionQN = connectionImporter.getBuilder(deserializer).build().qualifiedName - val qnDetails = getQualifiedNameDetails(deserializer.row, deserializer.heading, typeNameFilter) - val schemaQN = "$connectionQN/${qnDetails.parentPartialQN}" - return MaterializedView - .creator(name, schemaQN) - .columnCount(preprocessed.qualifiedNameToChildCount[qnDetails.uniqueQN]?.toLong()) - } -} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/MaterializedViewXformer.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/MaterializedViewXformer.kt new file mode 100644 index 0000000000..4c5c68da0c --- /dev/null +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/MaterializedViewXformer.kt @@ -0,0 +1,21 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.rab + +import RelationalAssetsBuilderCfg +import com.atlan.model.assets.MaterializedView +import com.atlan.pkg.PackageContext +import mu.KLogger + +class MaterializedViewXformer( + private val ctx: PackageContext, + completeHeaders: List, + preprocessedDetails: Importer.Results, + private val logger: KLogger, +) : ContainerXformer( + ctx = ctx, + completeHeaders = completeHeaders, + typeNameFilter = MaterializedView.TYPE_NAME, + preprocessedDetails = preprocessedDetails, + logger = logger, + ) diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/SchemaImporter.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/SchemaImporter.kt deleted file mode 100644 index e8b3ec70d1..0000000000 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/SchemaImporter.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* SPDX-License-Identifier: Apache-2.0 - Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.rab - -import RelationalAssetsBuilderCfg -import com.atlan.model.assets.Asset -import com.atlan.model.assets.Schema -import com.atlan.pkg.PackageContext -import com.atlan.pkg.serde.RowDeserializer -import com.atlan.pkg.util.DeltaProcessor -import mu.KLogger - -/** - * Import schemas into Atlan from a provided CSV file. - * - * Only the schemas and attributes in the provided CSV file will attempt to be loaded. - * By default, any blank values in a cell in the CSV file will be ignored. If you would like any - * particular column's blank values to actually overwrite (i.e. remove) existing values for that - * asset in Atlan, then add that column's field to getAttributesToOverwrite. - * - * @param ctx context through which this package is running - * @param delta the processor containing any details about file deltas - * @param preprocessed details of the preprocessed CSV file - * @param connectionImporter that was used to import connections - * @param logger through which to record logging - */ -class SchemaImporter( - ctx: PackageContext, - private val delta: DeltaProcessor, - private val preprocessed: Importer.Results, - private val connectionImporter: ConnectionImporter, - logger: KLogger, -) : AssetImporter( - ctx = ctx, - delta = delta, - filename = preprocessed.preprocessedFile, - typeNameFilter = Schema.TYPE_NAME, - logger = logger, - ) { - /** {@inheritDoc} */ - override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> { - val name = deserializer.getValue(Schema.SCHEMA_NAME.atlanFieldName)?.let { it as String } ?: "" - val connectionQN = connectionImporter.getBuilder(deserializer).build().qualifiedName - val qnDetails = getQualifiedNameDetails(deserializer.row, deserializer.heading, typeNameFilter) - val databaseQN = "$connectionQN/${qnDetails.parentPartialQN}" - return Schema - .creator(name, databaseQN) - .tableCount(preprocessed.qualifiedNameToTableCount[qnDetails.uniqueQN]?.toInt()) - .viewCount(preprocessed.qualifiedNameToViewCount[qnDetails.uniqueQN]?.toInt()) - } -} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/SchemaXformer.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/SchemaXformer.kt new file mode 100644 index 0000000000..5777bc6c56 --- /dev/null +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/SchemaXformer.kt @@ -0,0 +1,48 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.rab + +import RelationalAssetsBuilderCfg +import com.atlan.model.assets.Asset +import com.atlan.model.assets.Schema +import com.atlan.pkg.PackageContext +import com.atlan.pkg.serde.RowSerde +import mu.KLogger + +class SchemaXformer( + private val ctx: PackageContext, + completeHeaders: List, + preprocessedDetails: Importer.Results, + private val logger: KLogger, +) : AssetXformer( + ctx = ctx, + completeHeaders = completeHeaders, + typeNameFilter = Schema.TYPE_NAME, + preprocessedDetails = preprocessedDetails, + logger = logger, + ) { + override fun mapAsset(inputRow: Map): Map { + val connectionQN = getConnectionQN(ctx, inputRow) + val details = getSQLHierarchyDetails(inputRow, typeNameFilter, preprocessedDetails.entityQualifiedNameToType) + val assetQN = "$connectionQN/${details.partialQN}" + val parentQN = "$connectionQN/${details.parentPartialQN}" + val tableCount = preprocessedDetails.qualifiedNameToTableCount[details.uniqueQN]?.toInt() + val viewCount = preprocessedDetails.qualifiedNameToViewCount[details.uniqueQN]?.toInt() + return if (assetQN.isNotBlank()) { + return mapOf( + RowSerde.getHeaderForField(Asset.QUALIFIED_NAME) to assetQN, + RowSerde.getHeaderForField(Asset.TYPE_NAME) to typeNameFilter, + RowSerde.getHeaderForField(Asset.NAME) to details.name, + RowSerde.getHeaderForField(Asset.CONNECTOR_TYPE) to getConnectorType(inputRow), + RowSerde.getHeaderForField(Asset.CONNECTION_QUALIFIED_NAME) to connectionQN, + RowSerde.getHeaderForField(Schema.DATABASE_NAME, Schema::class.java) to details.parentName, + RowSerde.getHeaderForField(Schema.DATABASE_QUALIFIED_NAME, Schema::class.java) to parentQN, + RowSerde.getHeaderForField(Schema.DATABASE, Schema::class.java) to "Database@$parentQN", + RowSerde.getHeaderForField(Schema.TABLE_COUNT, Schema::class.java) to (tableCount?.toString() ?: ""), + RowSerde.getHeaderForField(Schema.VIEW_COUNT, Schema::class.java) to (viewCount?.toString() ?: ""), + ) + } else { + mapOf() + } + } +} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/TableImporter.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/TableImporter.kt deleted file mode 100644 index 8fdfbfca51..0000000000 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/TableImporter.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* SPDX-License-Identifier: Apache-2.0 - Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.rab - -import RelationalAssetsBuilderCfg -import com.atlan.model.assets.Asset -import com.atlan.model.assets.Table -import com.atlan.pkg.PackageContext -import com.atlan.pkg.serde.RowDeserializer -import com.atlan.pkg.util.DeltaProcessor -import mu.KLogger - -/** - * Import tables into Atlan from a provided CSV file. - * - * Only the tables and attributes in the provided CSV file will attempt to be loaded. - * By default, any blank values in a cell in the CSV file will be ignored. If you would like any - * particular column's blank values to actually overwrite (i.e. remove) existing values for that - * asset in Atlan, then add that column's field to getAttributesToOverwrite. - * - * @param ctx context through which this package is running - * @param delta the processor containing any details about file deltas - * @param preprocessed details of the preprocessed CSV file - * @param connectionImporter that was used to import connections - * @param logger through which to record logging - */ -class TableImporter( - ctx: PackageContext, - private val delta: DeltaProcessor, - private val preprocessed: Importer.Results, - private val connectionImporter: ConnectionImporter, - logger: KLogger, -) : AssetImporter( - ctx = ctx, - delta = delta, - filename = preprocessed.preprocessedFile, - typeNameFilter = Table.TYPE_NAME, - logger = logger, - ) { - /** {@inheritDoc} */ - override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> { - val name = deserializer.getValue(ENTITY_NAME)?.let { it as String } ?: "" - val connectionQN = connectionImporter.getBuilder(deserializer).build().qualifiedName - val qnDetails = getQualifiedNameDetails(deserializer.row, deserializer.heading, typeNameFilter) - val schemaQN = "$connectionQN/${qnDetails.parentPartialQN}" - return Table - .creator(name, schemaQN) - .columnCount(preprocessed.qualifiedNameToChildCount[qnDetails.uniqueQN]?.toLong()) - } -} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/TableXformer.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/TableXformer.kt new file mode 100644 index 0000000000..b3e4dfc5f3 --- /dev/null +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/TableXformer.kt @@ -0,0 +1,21 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.rab + +import RelationalAssetsBuilderCfg +import com.atlan.model.assets.Table +import com.atlan.pkg.PackageContext +import mu.KLogger + +class TableXformer( + private val ctx: PackageContext, + completeHeaders: List, + preprocessedDetails: Importer.Results, + private val logger: KLogger, +) : ContainerXformer( + ctx = ctx, + completeHeaders = completeHeaders, + typeNameFilter = Table.TYPE_NAME, + preprocessedDetails = preprocessedDetails, + logger = logger, + ) diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ViewImporter.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ViewImporter.kt deleted file mode 100644 index da188d6e06..0000000000 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ViewImporter.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* SPDX-License-Identifier: Apache-2.0 - Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.rab - -import RelationalAssetsBuilderCfg -import com.atlan.model.assets.Asset -import com.atlan.model.assets.View -import com.atlan.pkg.PackageContext -import com.atlan.pkg.serde.RowDeserializer -import com.atlan.pkg.util.DeltaProcessor -import mu.KLogger - -/** - * Import views into Atlan from a provided CSV file. - * - * Only the views and attributes in the provided CSV file will attempt to be loaded. - * By default, any blank values in a cell in the CSV file will be ignored. If you would like any - * particular column's blank values to actually overwrite (i.e. remove) existing values for that - * asset in Atlan, then add that column's field to getAttributesToOverwrite. - * - * @param ctx context through which this package is running - * @param delta the processor containing any details about file deltas - * @param preprocessed details of the preprocessed CSV file - * @param connectionImporter that was used to import connections - * @param logger through which to record logging - */ -class ViewImporter( - ctx: PackageContext, - private val delta: DeltaProcessor, - private val preprocessed: Importer.Results, - private val connectionImporter: ConnectionImporter, - logger: KLogger, -) : AssetImporter( - ctx = ctx, - delta = delta, - filename = preprocessed.preprocessedFile, - typeNameFilter = View.TYPE_NAME, - logger = logger, - ) { - /** {@inheritDoc} */ - override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> { - val name = deserializer.getValue(ENTITY_NAME)?.let { it as String } ?: "" - val connectionQN = connectionImporter.getBuilder(deserializer).build().qualifiedName - val qnDetails = getQualifiedNameDetails(deserializer.row, deserializer.heading, typeNameFilter) - val schemaQN = "$connectionQN/${qnDetails.parentPartialQN}" - return View - .creator(name, schemaQN) - .columnCount(preprocessed.qualifiedNameToChildCount[qnDetails.uniqueQN]?.toLong()) - } -} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ViewXformer.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ViewXformer.kt new file mode 100644 index 0000000000..aced120519 --- /dev/null +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/ViewXformer.kt @@ -0,0 +1,21 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.rab + +import RelationalAssetsBuilderCfg +import com.atlan.model.assets.View +import com.atlan.pkg.PackageContext +import mu.KLogger + +class ViewXformer( + private val ctx: PackageContext, + completeHeaders: List, + preprocessedDetails: Importer.Results, + private val logger: KLogger, +) : ContainerXformer( + ctx = ctx, + completeHeaders = completeHeaders, + typeNameFilter = View.TYPE_NAME, + preprocessedDetails = preprocessedDetails, + logger = logger, + ) diff --git a/sdk/src/main/java/com/atlan/api/ApiTokensEndpoint.java b/sdk/src/main/java/com/atlan/api/ApiTokensEndpoint.java index 7b525d99bc..a2d2874deb 100644 --- a/sdk/src/main/java/com/atlan/api/ApiTokensEndpoint.java +++ b/sdk/src/main/java/com/atlan/api/ApiTokensEndpoint.java @@ -31,6 +31,7 @@ */ public class ApiTokensEndpoint extends HeraclesEndpoint { + private static final long MAX_VALIDITY = 157680000L; private static final String endpoint = "/apikeys"; public ApiTokensEndpoint(AtlanClient client) { @@ -298,10 +299,10 @@ public ApiTokenRequest(String displayName, String description, Set perso if (validitySeconds != null) { if (validitySeconds < 0) { // Treat negative numbers as "infinite" (never expire) - this.validitySeconds = 409968000L; + this.validitySeconds = MAX_VALIDITY; } else { // Otherwise use "infinite" as the ceiling for values - this.validitySeconds = Math.min(validitySeconds, 409968000L); + this.validitySeconds = Math.min(validitySeconds, MAX_VALIDITY); } } }