From 882a38d1fbd0e1d72d73c48bd171520871541712 Mon Sep 17 00:00:00 2001 From: Christopher Grote Date: Tue, 28 Nov 2023 23:08:37 +0000 Subject: [PATCH] Adds parallel asset batching and refactors CSV loaders for reuse Signed-off-by: Christopher Grote --- gradle/libs.versions.toml | 2 + package-toolkit/runtime/build.gradle.kts | 2 + .../kotlin/com/atlan/pkg/serde/FieldSerde.kt | 4 +- .../atlan/pkg/serde/cell/AssetRefXformer.kt | 4 +- .../atlan/pkg/serde/csv}/AssetGenerator.kt | 10 +- .../com/atlan/pkg/serde/csv/CSVImporter.kt | 174 ++++++++++++ .../com/atlan/pkg/serde/csv}/CSVReader.kt | 140 +++++----- .../packages/asset-import/build.gradle.kts | 26 +- .../com/atlan/pkg/aim/AssetGenerator.kt | 61 ----- .../kotlin/com/atlan/pkg/aim/AssetImporter.kt | 69 +---- .../kotlin/com/atlan/pkg/aim/CSVReader.kt | 256 ------------------ .../com/atlan/pkg/aim/CategoryImporter.kt | 36 +-- .../kotlin/com/atlan/pkg/aim/GTCImporter.kt | 122 +++------ .../com/atlan/pkg/aim/GlossaryImporter.kt | 15 +- .../main/kotlin/com/atlan/pkg/aim/Importer.kt | 82 +----- .../kotlin/com/atlan/pkg/aim/TermImporter.kt | 48 +--- .../src/main/kotlin/DuplicateDetector.kt | 3 +- .../build.gradle.kts | 26 +- .../kotlin/com/atlan/pkg/rab/AssetImporter.kt | 106 ++------ .../main/kotlin/com/atlan/pkg/rab/Importer.kt | 79 +----- .../java/com/atlan/util/ParallelBatch.java | 214 +++++++++++++++ sdk/src/test/resources/logback-test.xml | 12 - 22 files changed, 577 insertions(+), 914 deletions(-) rename {samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab => package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv}/AssetGenerator.kt (83%) create mode 100644 package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVImporter.kt rename {samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab => package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv}/CSVReader.kt (66%) delete mode 100644 samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetGenerator.kt delete mode 100644 samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/CSVReader.kt create mode 100644 sdk/src/main/java/com/atlan/util/ParallelBatch.java delete mode 100644 sdk/src/test/resources/logback-test.xml diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 061f509ed4..620a09e913 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -11,6 +11,7 @@ jnanoid = "2.0.0" numaflow = "0.4.8" awssdk = "2.20.68" system-stubs = "2.1.5" +fastcsv = "2.2.2" [libraries] jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" } @@ -28,6 +29,7 @@ jnanoid = { module = "com.aventrix.jnanoid:jnanoid", version.ref = "jnanoid" } numaflow-java = { module = "io.numaproj.numaflow:numaflow-java", version.ref = "numaflow" } awssdk-s3 = { module = "software.amazon.awssdk:s3", version.ref = "awssdk" } system-stubs = { module = "uk.org.webcompere:system-stubs-testng", version.ref = "system-stubs" } +fastcsv = { module = "de.siegmar:fastcsv", version.ref = "fastcsv" } [bundles] java-test = [ "jnanoid", "testng", "wiremock" ] diff --git a/package-toolkit/runtime/build.gradle.kts b/package-toolkit/runtime/build.gradle.kts index f09e2220e5..a949952e64 100644 --- a/package-toolkit/runtime/build.gradle.kts +++ b/package-toolkit/runtime/build.gradle.kts @@ -11,6 +11,7 @@ plugins { dependencies { api(libs.jackson.kotlin) + api(libs.fastcsv) // You would not need the dependencies below in reality, they are to simulate a running tenant testImplementation(libs.bundles.java.test) testImplementation(project(":mocks")) @@ -29,6 +30,7 @@ tasks { include(dependency("org.apache.logging.log4j:log4j-slf4j2-impl:.*")) include(dependency("org.jetbrains.kotlin:kotlin-reflect:.*")) include(dependency("com.fasterxml.jackson.module:jackson-module-kotlin:.*")) + include(dependency("de.siegmar:fastcsv:.*")) } mergeServiceFiles() } diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/FieldSerde.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/FieldSerde.kt index 605a14024b..86e1692ba7 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/FieldSerde.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/FieldSerde.kt @@ -10,6 +10,7 @@ import com.atlan.pkg.serde.cell.CellXformer import com.atlan.serde.Serde import mu.KLogger import java.lang.reflect.Method +import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.atomic.AtomicBoolean /** @@ -118,6 +119,7 @@ object FieldSerde { fun getBuilderForType(typeName: String): Asset.AssetBuilder<*, *> { val assetClass = Serde.getAssetClassForType(typeName) val method = assetClass.getMethod("_internal") - return method.invoke(null) as Asset.AssetBuilder<*, *> + return (method.invoke(null) as Asset.AssetBuilder<*, *>) + .guid("-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE - 1)) } } diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/cell/AssetRefXformer.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/cell/AssetRefXformer.kt index f81a01a208..5d1b6f471f 100644 --- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/cell/AssetRefXformer.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/cell/AssetRefXformer.kt @@ -12,7 +12,7 @@ import com.atlan.model.assets.Readme import com.atlan.pkg.Utils import com.atlan.pkg.cache.LinkCache import com.atlan.serde.Serde -import com.atlan.util.AssetBatch +import com.atlan.util.ParallelBatch import mu.KLogger import java.util.concurrent.atomic.AtomicLong @@ -97,7 +97,7 @@ object AssetRefXformer { fun buildRelated( from: Asset, relatedAssets: Map>, - batch: AssetBatch, + batch: ParallelBatch, count: AtomicLong, totalRelated: AtomicLong, logger: KLogger, diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetGenerator.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/AssetGenerator.kt similarity index 83% rename from samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetGenerator.kt rename to package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/AssetGenerator.kt index e74774173e..c2b3c178fc 100644 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetGenerator.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/AssetGenerator.kt @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: Apache-2.0 Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.rab +package com.atlan.pkg.serde.csv import com.atlan.model.assets.Asset import com.atlan.pkg.serde.RowDeserialization @@ -15,11 +15,12 @@ interface AssetGenerator { * * @param row the row of values from which to build the asset * @param header list of field names in the same order as columns in the tabular data - * @param typeName index of the typeName column + * @param typeIdx numeric index within the columns of the typeName field + * @param qnIdx numeric index within the columns of the qualifiedName field * @param skipColumns columns to skip, i.e. that need to be handled in a later pass * @return the asset(s) built from the values on the row */ - fun buildFromRow(row: List, header: List, typeName: Int, skipColumns: Set): RowDeserialization? + fun buildFromRow(row: List, header: List, typeIdx: Int, qnIdx: Int, skipColumns: Set): RowDeserialization? /** * Check whether to include this row as part of the processing (true) or not (false). @@ -27,9 +28,10 @@ interface AssetGenerator { * @param row of values * @param header column names * @param typeIdx index of the typeName + * @param qnIdx index of the qualifiedName * @return true if the row should be included in the import, or false if not */ - fun includeRow(row: List, header: List, typeIdx: Int): Boolean + fun includeRow(row: List, header: List, typeIdx: Int, qnIdx: Int): Boolean /** * Start a builder for the asset on this row. diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVImporter.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVImporter.kt new file mode 100644 index 0000000000..4879901a9b --- /dev/null +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVImporter.kt @@ -0,0 +1,174 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.pkg.serde.csv + +import com.atlan.cache.ReflectionCache +import com.atlan.model.assets.Asset +import com.atlan.model.fields.AtlanField +import com.atlan.model.fields.SearchableField +import com.atlan.pkg.serde.RowDeserialization +import com.atlan.pkg.serde.RowDeserializer +import com.atlan.serde.Serde +import mu.KLogger +import java.lang.reflect.InvocationTargetException +import kotlin.system.exitProcess + +/** + * Import assets into Atlan from a provided CSV file. + * + * Only the assets and attributes in the provided CSV file will attempt to be loaded. + * By default, any blank values in a cell in the CSV file will be ignored. If you would like any + * particular column's blank values to actually overwrite (i.e. remove) existing values for that + * asset in Atlan, then add that column's field to getAttributesToOverwrite. + * + * @param filename name of the file to import + * @param logger through which to record progress and any errors + * @param typeNameFilter name of the types that should be processed (primarily useful for multi-pass loads) + * @param attrsToOverwrite list of fields that should be overwritten in Atlan, if their value is empty in the CSV + * @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist) + * @param batchSize maximum number of records to save per API request + */ +abstract class CSVImporter( + private val filename: String, + protected val logger: KLogger, + protected val typeNameFilter: String = "", + private val attrsToOverwrite: List = listOf(), + private val updateOnly: Boolean = false, + private val batchSize: Int = 20, +) : AssetGenerator { + + /** + * Actually run the import. + * + * @param columnsToSkip (optional) columns in the CSV file to skip when loading (primarily useful for multi-pass loads) + */ + open fun import(columnsToSkip: Set = setOf()) { + CSVReader(filename, updateOnly).use { csv -> + val start = System.currentTimeMillis() + val anyFailures = csv.streamRows(this, batchSize, logger, columnsToSkip) + logger.info { "Total time taken: ${System.currentTimeMillis() - start} ms" } + if (anyFailures) { + logger.error { "Some errors detected, failing the workflow." } + exitProcess(1) + } + cacheCreated(csv.created) + } + } + + /** {@inheritDoc} */ + override fun buildFromRow(row: List, header: List, typeIdx: Int, qnIdx: Int, skipColumns: Set): RowDeserialization? { + // Deserialize the objects represented in that row (could be more than one due to flattening + // of in particular things like READMEs and Links) + if (includeRow(row, header, typeIdx, qnIdx)) { + val typeName = typeNameFilter.ifBlank { + row.getOrElse(typeIdx) { "" } + } + val qualifiedName = row.getOrElse(qnIdx) { "" } + val deserializer = RowDeserializer( + heading = header, + row = row, + typeIdx = typeIdx, + qnIdx = qnIdx, + typeName = typeName, + qualifiedName = qualifiedName, + logger = logger, + skipColumns = skipColumns, + ) + val assets = deserializer.getAssets(getBuilder(deserializer)) + if (assets != null) { + val builder = assets.primary + val candidate = builder.build() + val identity = RowDeserialization.AssetIdentity(candidate.typeName, candidate.qualifiedName) + // Then apply any field clearances based on attributes configured in the job + for (field in attrsToOverwrite) { + clearField(field, candidate, builder) + // If there are no related assets + if (!assets.related.containsKey(field.atlanFieldName)) { + assets.delete.add(field) + } + } + return RowDeserialization(identity, builder, assets.related, assets.delete) + } + } + return null + } + + /** {@inheritDoc} */ + override fun includeRow(row: List, header: List, typeIdx: Int, qnIdx: Int): Boolean { + return row[typeIdx] == typeNameFilter + } + + /** + * Check if the provided field should be cleared, and if so clear it. + * + * @param field to check if it is empty and should be cleared + * @param candidate the asset on which to check whether the field is empty (or not) + * @param builder the builder against which to clear the field + * @return true if the field was cleared, false otherwise + */ + internal fun clearField(field: AtlanField, candidate: Asset, builder: Asset.AssetBuilder<*, *>): Boolean { + try { + val getter = ReflectionCache.getGetter( + Serde.getAssetClassForType(candidate.typeName), + field.atlanFieldName, + ) + val value = getter.invoke(candidate) + if (value == null || + (Collection::class.java.isAssignableFrom(value.javaClass) && (value as Collection<*>).isEmpty()) + ) { + builder.nullField(field.atlanFieldName) + return true + } + } catch (e: ClassNotFoundException) { + logger.error( + "Unknown type {} — cannot clear {}.", + candidate.typeName, + field.atlanFieldName, + e, + ) + } catch (e: IllegalAccessException) { + logger.error( + "Unable to clear {} on: {}::{}", + field.atlanFieldName, + candidate.typeName, + candidate.qualifiedName, + e, + ) + } catch (e: InvocationTargetException) { + logger.error( + "Unable to clear {} on: {}::{}", + field.atlanFieldName, + candidate.typeName, + candidate.qualifiedName, + e, + ) + } + return false + } + + companion object { + /** + * Determine which (if any) attributes should be cleared (removed) if they are empty in the input file. + * + * @param attrNames the list of attribute names provided through the configuration + * @param fileInfo a descriptor to qualify for which file the attributes are being set + * @param logger through which to record information + * @return parsed list of attribute names to be cleared + */ + fun attributesToClear(attrNames: MutableList, fileInfo: String, logger: KLogger): List { + if (attrNames.contains(Asset.CERTIFICATE_STATUS.atlanFieldName)) { + attrNames.add(Asset.CERTIFICATE_STATUS_MESSAGE.atlanFieldName) + } + if (attrNames.contains(Asset.ANNOUNCEMENT_TYPE.atlanFieldName)) { + attrNames.add(Asset.ANNOUNCEMENT_TITLE.atlanFieldName) + attrNames.add(Asset.ANNOUNCEMENT_MESSAGE.atlanFieldName) + } + logger.info { "Adding attributes to be cleared, if blank (for $fileInfo): $attrNames" } + val attrFields = mutableListOf() + for (name in attrNames) { + attrFields.add(SearchableField(name, name)) + } + return attrFields + } + } +} diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/CSVReader.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVReader.kt similarity index 66% rename from samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/CSVReader.kt rename to package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVReader.kt index 8d19d8c7ad..ab4b749194 100644 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/CSVReader.kt +++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/serde/csv/CSVReader.kt @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: Apache-2.0 Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.rab +package com.atlan.pkg.serde.csv import com.atlan.Atlan import com.atlan.cache.ReflectionCache @@ -11,6 +11,7 @@ import com.atlan.model.fields.AtlanField import com.atlan.pkg.Utils import com.atlan.pkg.serde.cell.AssetRefXformer import com.atlan.util.AssetBatch +import com.atlan.util.ParallelBatch import de.siegmar.fastcsv.reader.CsvReader import de.siegmar.fastcsv.reader.CsvRow import mu.KLogger @@ -37,6 +38,7 @@ class CSVReader @JvmOverloads constructor( private val counter: CsvReader private val header: List private val typeIdx: Int + private val qualifiedNameIdx: Int val created: ConcurrentHashMap @@ -54,6 +56,7 @@ class CSVReader @JvmOverloads constructor( .orElse(emptyList()) } typeIdx = header.indexOf(Asset.TYPE_NAME.atlanFieldName) + qualifiedNameIdx = header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName) if (typeIdx < 0) { throw IOException( "Unable to find the column 'typeName'. This is a mandatory column in the input CSV.", @@ -77,16 +80,29 @@ class CSVReader @JvmOverloads constructor( * @return true if all rows were processed successfully, or false if there were any failures */ fun streamRows(rowToAsset: AssetGenerator, batchSize: Int, logger: KLogger, skipColumns: Set = setOf()): Boolean { - // Note that for proper parallelism we need to manage a separate AssetBatch per thread - val batchMap: MutableMap = ConcurrentHashMap() - val relatedMap: MutableMap = ConcurrentHashMap() - val relatedHolds: MutableMap> = ConcurrentHashMap() - val deferDeletes: MutableMap>> = ConcurrentHashMap() + val client = Atlan.getDefaultClient() + val primaryBatch = ParallelBatch( + client, + batchSize, + true, + AssetBatch.CustomMetadataHandling.MERGE, + true, + updateOnly, + ) + val relatedBatch = ParallelBatch( + client, + batchSize, + true, + AssetBatch.CustomMetadataHandling.MERGE, + true, + ) + val relatedHolds: MutableMap = ConcurrentHashMap() + val deferDeletes: MutableMap> = ConcurrentHashMap() var someFailure = false val filteredRowCount = AtomicLong(0) counter.stream().skip(1).parallel().forEach { row -> - if (rowToAsset.includeRow(row.fields, header, typeIdx)) { + if (rowToAsset.includeRow(row.fields, header, typeIdx, qualifiedNameIdx)) { filteredRowCount.incrementAndGet() } } @@ -95,97 +111,63 @@ class CSVReader @JvmOverloads constructor( logger.info { "Loading a total of $totalRowCount assets..." } val count = AtomicLong(0) reader.stream().skip(1).parallel().forEach { r: CsvRow -> - val id = Thread.currentThread().id - if (!batchMap.containsKey(id)) { - // Initialize a new AssetBatch for each parallel thread - batchMap[id] = AssetBatch( - Atlan.getDefaultClient(), - batchSize, - true, - AssetBatch.CustomMetadataHandling.MERGE, - true, - updateOnly, - ) - relatedHolds[id] = ConcurrentHashMap() - deferDeletes[id] = ConcurrentHashMap() - relatedMap[id] = AssetBatch( - Atlan.getDefaultClient(), - batchSize, - true, - AssetBatch.CustomMetadataHandling.MERGE, - true, - ) - } - val localBatch = batchMap[id] - val assets = rowToAsset.buildFromRow(r.fields, header, typeIdx, skipColumns) + val assets = rowToAsset.buildFromRow(r.fields, header, typeIdx, qualifiedNameIdx, skipColumns) if (assets != null) { try { val asset = assets.primary.build() - localBatch!!.add(asset) + primaryBatch.add(asset) Utils.logProgress(count, totalRowCount, logger, batchSize) if (assets.related.isNotEmpty()) { - relatedHolds[id]!![asset.guid] = RelatedAssetHold(asset, assets.related) + relatedHolds[asset.guid] = RelatedAssetHold(asset, assets.related) } if (assets.delete.isNotEmpty()) { - deferDeletes[id]!![asset.guid] = assets.delete + deferDeletes[asset.guid] = assets.delete } } catch (e: AtlanException) { logger.error("Unable to load batch.", e) } } } + primaryBatch.flush() + primaryBatch.created.forEach { asset -> + created[asset.guid] = asset + } + val totalCreates = primaryBatch.created.size + val totalUpdates = primaryBatch.updated.size + val totalSkipped = primaryBatch.skipped.size + val totalFailures = AtomicLong(0) + someFailure = someFailure || primaryBatch.failures.isNotEmpty() + logFailures(primaryBatch, logger, totalFailures) + logger.info { "Total assets created: $totalCreates" } + logger.info { "Total assets updated: $totalUpdates" } + logger.info { "Total assets skipped: $totalSkipped" } + logger.info { "Total assets failed : $totalFailures" } // Step 2: load the deferred related assets (and final-flush the main asset batches, too) - val totalCreates = AtomicLong(0) - val totalUpdates = AtomicLong(0) - val totalSkipped = AtomicLong(0) - val totalFailures = AtomicLong(0) val totalRelated = AtomicLong(0) val searchAndDelete = mutableMapOf>() - relatedHolds.values.forEach { a -> a.values.forEach { b -> totalRelated.getAndAdd(b.relatedMap.size.toLong()) } } + relatedHolds.values.forEach { b -> totalRelated.getAndAdd(b.relatedMap.size.toLong()) } logger.info { "Processing $totalRelated total related assets in a second pass." } - batchMap.entries.parallelStream().forEach { entry: MutableMap.MutableEntry -> - val threadId = entry.key - val batch = entry.value - val relatedBatch = relatedMap[threadId]!! - batch.flush() - batch.created.forEach { asset -> - created[asset.guid] = asset - } - totalCreates.getAndAdd(batch.created.size.toLong()) - totalUpdates.getAndAdd(batch.updated.size.toLong()) - totalSkipped.getAndAdd(batch.skipped.size.toLong()) - someFailure = someFailure || batch.failures.isNotEmpty() - logFailures(batch, logger, totalFailures) - for (hold in relatedHolds[threadId]!!) { - val placeholderGuid = hold.key - val relatedAssetHold = hold.value - val resolvedGuid = batch.resolvedGuids[placeholderGuid] - val resolvedAsset = relatedAssetHold.fromAsset.toBuilder().guid(resolvedGuid).build() as Asset - AssetRefXformer.buildRelated(resolvedAsset, relatedAssetHold.relatedMap, relatedBatch, count, totalRelated, logger, batchSize) - } - for (delete in deferDeletes[threadId]!!) { - val placeholderGuid = delete.key - val resolvedGuid = batch.resolvedGuids[placeholderGuid]!! - searchAndDelete[resolvedGuid] = delete.value - } + relatedHolds.entries.parallelStream().forEach { hold: MutableMap.MutableEntry -> + val placeholderGuid = hold.key + val relatedAssetHold = hold.value + val resolvedGuid = primaryBatch.resolvedGuids[placeholderGuid] + val resolvedAsset = relatedAssetHold.fromAsset.toBuilder().guid(resolvedGuid).build() as Asset + AssetRefXformer.buildRelated(resolvedAsset, relatedAssetHold.relatedMap, relatedBatch, count, totalRelated, logger, batchSize) + } + deferDeletes.entries.parallelStream().forEach { delete: MutableMap.MutableEntry> -> + val placeholderGuid = delete.key + val resolvedGuid = primaryBatch.resolvedGuids[placeholderGuid]!! + searchAndDelete[resolvedGuid] = delete.value } - logger.info { "Total assets created: $totalCreates" } - logger.info { "Total assets updated: $totalUpdates" } - logger.info { "Total assets skipped: $totalSkipped" } - logger.info { "Total assets failed : $totalFailures" } // Step 3: final-flush the deferred related assets - val totalCreatesR = AtomicLong(0) - val totalUpdatesR = AtomicLong(0) + relatedBatch.flush() + val totalCreatesR = relatedBatch.created.size + val totalUpdatesR = relatedBatch.updated.size val totalFailuresR = AtomicLong(0) - relatedMap.values.parallelStream().forEach { b -> - b.flush() - totalCreatesR.getAndAdd(b.created.size.toLong()) - totalUpdatesR.getAndAdd(b.updated.size.toLong()) - someFailure = someFailure || b.failures.isNotEmpty() - logFailures(b, logger, totalFailuresR) - } + someFailure = someFailure || relatedBatch.failures.isNotEmpty() + logFailures(relatedBatch, logger, totalFailuresR) logger.info { "Total related assets created: $totalCreatesR" } logger.info { "Total related assets updated: $totalUpdatesR" } logger.info { "Total related assets failed : $totalFailuresR" } @@ -198,7 +180,7 @@ class CSVReader @JvmOverloads constructor( searchAndDelete.entries.parallelStream().forEach { entry -> val guid = entry.key val fields = entry.value - Atlan.getDefaultClient().assets.select() + client.assets.select() .where(Asset.GUID.eq(guid)) .includesOnResults(fields) .stream() @@ -218,7 +200,7 @@ class CSVReader @JvmOverloads constructor( } } if (guids.isNotEmpty()) { - val response = Atlan.getDefaultClient().assets.delete(guids, AtlanDeleteType.SOFT) + val response = client.assets.delete(guids, AtlanDeleteType.SOFT) totalDeleted.getAndAdd(response.deletedAssets.size.toLong()) } } @@ -228,7 +210,7 @@ class CSVReader @JvmOverloads constructor( return someFailure } - private fun logFailures(b: AssetBatch, logger: KLogger, totalFailures: AtomicLong) { + private fun logFailures(b: ParallelBatch, logger: KLogger, totalFailures: AtomicLong) { if (b.failures.isNotEmpty()) { for (f in b.failures) { logger.info { "Failed batch reason: ${f.failureReason}" } diff --git a/samples/packages/asset-import/build.gradle.kts b/samples/packages/asset-import/build.gradle.kts index a41ebb5eca..95bba03964 100644 --- a/samples/packages/asset-import/build.gradle.kts +++ b/samples/packages/asset-import/build.gradle.kts @@ -1,28 +1,4 @@ -val jarPath = "$rootDir/jars" - +/* SPDX-License-Identifier: Apache-2.0 */ plugins { id("com.atlan.kotlin-custom-package") - alias(libs.plugins.shadow) -} - -dependencies { - implementation("de.siegmar:fastcsv:2.2.2") -} - -tasks { - shadowJar { - isZip64 = true - destinationDirectory.set(file(jarPath)) - dependencies { - include(dependency("de.siegmar:fastcsv:.*")) - } - mergeServiceFiles() - } - - jar { - // Override the default jar task so we get the shadowed jar - // as the only jar output - actions = listOf() - doLast { shadowJar } - } } diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetGenerator.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetGenerator.kt deleted file mode 100644 index c85c45795c..0000000000 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetGenerator.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* SPDX-License-Identifier: Apache-2.0 - Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.aim - -import com.atlan.model.assets.Asset -import com.atlan.pkg.serde.RowDeserialization -import com.atlan.pkg.serde.cell.AssetRefXformer -import com.atlan.util.AssetBatch -import mu.KLogger -import java.util.concurrent.atomic.AtomicLong - -/** - * Interface to generate an asset object from a row of string values. - */ -interface AssetGenerator { - /** - * Build an asset object from the string values of a row of tabular data. - * - * @param row the row of values from which to build the asset - * @param header list of field names in the same order as columns in the tabular data - * @param typeIdx numeric index within the columns of the typeName field - * @param qnIdx numeric index within the columns of the qualifiedName field - * @param skipColumns columns to skip, i.e. that need to be handled in a later pass - * @return the asset(s) built from the values on the row - */ - fun buildFromRow(row: List, header: List, typeIdx: Int, qnIdx: Int, skipColumns: Set): RowDeserialization? - - /** - * Batch up a complete related asset object from the provided asset and (partial) related asset details. - * - * @param from the asset to which another asset is to be related (should have at least its GUID and name) - * @param relatedAssets the (partial) asset(s) that should be related to the asset, which needs to be completed - * @param batch the batch through which to create the asset(s) / relationships - * @param count the running count of how many relationships have been created - * @param totalRelated the static total number of relationships anticipated - * @param logger through which to log progress - * @param batchSize maximum number of relationships / assets to create per API call - */ - fun batchRelated( - from: Asset, - relatedAssets: Map>, - batch: AssetBatch, - count: AtomicLong, - totalRelated: AtomicLong, - logger: KLogger, - batchSize: Int, - ) { - AssetRefXformer.buildRelated(from, relatedAssets, batch, count, totalRelated, logger, batchSize) - } - - /** - * Check whether to include this row as part of the processing (true) or not (false). - * - * @param row of values - * @param header column names - * @param typeIdx index of the typeName - * @param qnIdx index of the qualifiedName - * @return true if the row should be included in the import, or false if not - */ - fun includeRow(row: List, header: List, typeIdx: Int, qnIdx: Int): Boolean -} diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt index 0b6c91dfb8..068cf3cb8f 100644 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt +++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt @@ -2,12 +2,12 @@ Copyright 2023 Atlan Pte. Ltd. */ package com.atlan.pkg.aim +import com.atlan.model.assets.Asset import com.atlan.model.fields.AtlanField -import com.atlan.pkg.aim.Importer.clearField -import com.atlan.pkg.serde.RowDeserialization +import com.atlan.pkg.serde.FieldSerde import com.atlan.pkg.serde.RowDeserializer +import com.atlan.pkg.serde.csv.CSVImporter import mu.KotlinLogging -import kotlin.system.exitProcess /** * Import assets into Atlan from a provided CSV file. @@ -27,58 +27,17 @@ class AssetImporter( private val attrsToOverwrite: List, private val updateOnly: Boolean, private val batchSize: Int, -) : AssetGenerator { - private val logger = KotlinLogging.logger {} - - fun import() { - CSVReader(filename, updateOnly).use { csv -> - val start = System.currentTimeMillis() - val anyFailures = csv.streamRows(this, batchSize, logger) - logger.info { "Total time taken: ${System.currentTimeMillis() - start} ms" } - if (anyFailures) { - logger.error { "Some errors detected, failing the workflow." } - exitProcess(1) - } - } - } - - /** - * Translate a row of CSV values into an asset object, overwriting any attributes that were empty - * in the CSV with blank values, per the job configuration. - * - * @param row of values in the CSV - * @param header names of columns (and their position) in the header of the CSV - * @param typeIdx numeric index of the column containing the typeName of the asset in the row - * @param qnIdx numeric index of the column containing the qualifiedName of the asset in the row - * @param skipColumns columns to skip, i.e. that need to be processed in a later pass - * @return the deserialized asset object(s) - */ - override fun buildFromRow(row: List, header: List, typeIdx: Int, qnIdx: Int, skipColumns: Set): RowDeserialization? { - // Deserialize the objects represented in that row (could be more than one due to flattening - // of in particular things like READMEs and Links) - val assets = RowDeserializer( - heading = header, - row = row, - typeIdx = typeIdx, - qnIdx = qnIdx, - logger = logger, - skipColumns = skipColumns, - ).getAssets() - if (assets != null) { - val builder = assets.primary - val candidate = builder.build() - val identity = RowDeserialization.AssetIdentity(candidate.typeName, candidate.qualifiedName) - // Then apply any field clearances based on attributes configured in the job - for (field in attrsToOverwrite) { - clearField(field, candidate, builder) - // If there are no related assets - if (!assets.related.containsKey(field.atlanFieldName)) { - assets.delete.add(field) - } - } - return RowDeserialization(identity, builder, assets.related, assets.delete) - } - return null +) : CSVImporter( + filename, + logger = KotlinLogging.logger {}, + attrsToOverwrite = attrsToOverwrite, + updateOnly = updateOnly, + batchSize = batchSize, +) { + /** {@inheritDoc} */ + override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> { + val typeName = deserializer.getValue(Asset.TYPE_NAME.atlanFieldName)?.let { it as String } ?: "" + return FieldSerde.getBuilderForType(typeName) } /** {@inheritDoc} */ diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/CSVReader.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/CSVReader.kt deleted file mode 100644 index 6009c18218..0000000000 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/CSVReader.kt +++ /dev/null @@ -1,256 +0,0 @@ -/* SPDX-License-Identifier: Apache-2.0 - Copyright 2023 Atlan Pte. Ltd. */ -package com.atlan.pkg.aim - -import com.atlan.Atlan -import com.atlan.cache.ReflectionCache -import com.atlan.exception.AtlanException -import com.atlan.model.assets.Asset -import com.atlan.model.enums.AtlanDeleteType -import com.atlan.model.fields.AtlanField -import com.atlan.pkg.Utils -import com.atlan.util.AssetBatch -import de.siegmar.fastcsv.reader.CsvReader -import de.siegmar.fastcsv.reader.CsvRow -import mu.KLogger -import java.io.Closeable -import java.io.IOException -import java.nio.file.Paths -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicLong - -/** - * Utility class for reading from CSV files, using FastCSV. - * - * @param path location and filename of the CSV file to read - * @param updateOnly when true, the reader will first look up assets to ensure they exist (and only update them, never create) - * @param fieldSeparator character to use to separate fields (for example ',' or ';') - */ -class CSVReader @JvmOverloads constructor( - path: String, - private val updateOnly: Boolean, - fieldSeparator: Char = ',', -) : Closeable { - - private val reader: CsvReader - private val counter: CsvReader - private val header: List - private val typeIdx: Int - private val qualifiedNameIdx: Int - - val created: ConcurrentHashMap - - init { - val inputFile = Paths.get(path) - val builder = CsvReader.builder() - .fieldSeparator(fieldSeparator) - .quoteCharacter('"') - .skipEmptyRows(true) - .errorOnDifferentFieldCount(true) - builder.build(inputFile).use { tmp -> - val one = tmp.stream().findFirst() - header = - one.map { obj: CsvRow -> obj.fields } - .orElse(emptyList()) - } - typeIdx = header.indexOf(Asset.TYPE_NAME.atlanFieldName) - qualifiedNameIdx = header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName) - if (typeIdx < 0 || qualifiedNameIdx < 0) { - throw IOException( - "Unable to find either (or both) the columns 'typeName' and / or 'qualifiedName'. These are both mandatory columns in the input CSV.", - ) - } - created = ConcurrentHashMap() - reader = builder.build(inputFile) - counter = builder.build(inputFile) - } - - /** - * Parallel-read the CSV file into batched asset updates against Atlan. - * Note: this requires the input CSV file to be fully parallel-loadable without any - * conflicts. That means: every row is a unique asset, no two rows update any relationship - * attribute that points at the same related asset (such as an assigned term). - * - * @param rowToAsset translator from a row of CSV values to an asset object - * @param batchSize maximum number of Assets to bulk-save in Atlan per API request - * @param logger through which to report the overall progress - * @param skipColumns columns to skip during the processing (i.e. where they need to be processed in a later pass) - * @return true if all rows were processed successfully, or false if there were any failures - */ - fun streamRows(rowToAsset: AssetGenerator, batchSize: Int, logger: KLogger, skipColumns: Set = setOf()): Boolean { - // Note that for proper parallelism we need to manage a separate AssetBatch per thread - val batchMap: MutableMap = ConcurrentHashMap() - val relatedMap: MutableMap = ConcurrentHashMap() - val relatedHolds: MutableMap> = ConcurrentHashMap() - val deferDeletes: MutableMap>> = ConcurrentHashMap() - var someFailure = false - - val filteredRowCount = AtomicLong(0) - counter.stream().skip(1).parallel().forEach { row -> - if (rowToAsset.includeRow(row.fields, header, typeIdx, qualifiedNameIdx)) { - filteredRowCount.incrementAndGet() - } - } - val totalRowCount = filteredRowCount.get() - // Step 1: load the main assets - logger.info { "Loading a total of $totalRowCount assets..." } - val count = AtomicLong(0) - reader.stream().skip(1).parallel().forEach { r: CsvRow -> - val id = Thread.currentThread().id - if (!batchMap.containsKey(id)) { - // Initialize a new AssetBatch for each parallel thread - batchMap[id] = AssetBatch( - Atlan.getDefaultClient(), - batchSize, - true, - AssetBatch.CustomMetadataHandling.MERGE, - true, - updateOnly, - ) - relatedHolds[id] = ConcurrentHashMap() - deferDeletes[id] = ConcurrentHashMap() - relatedMap[id] = AssetBatch( - Atlan.getDefaultClient(), - batchSize, - true, - AssetBatch.CustomMetadataHandling.MERGE, - true, - ) - } - val localBatch = batchMap[id] - val assets = rowToAsset.buildFromRow(r.fields, header, typeIdx, qualifiedNameIdx, skipColumns) - if (assets != null) { - try { - val asset = assets.primary.build() - localBatch!!.add(asset) - Utils.logProgress(count, totalRowCount, logger, batchSize) - if (assets.related.isNotEmpty()) { - relatedHolds[id]!![asset.guid] = RelatedAssetHold(asset, assets.related) - } - if (assets.delete.isNotEmpty()) { - deferDeletes[id]!![asset.guid] = assets.delete - } - } catch (e: AtlanException) { - logger.error("Unable to load batch.", e) - } - } - } - - // Step 2: load the deferred related assets (and final-flush the main asset batches, too) - val totalCreates = AtomicLong(0) - val totalUpdates = AtomicLong(0) - val totalSkipped = AtomicLong(0) - val totalFailures = AtomicLong(0) - val totalRelated = AtomicLong(0) - val searchAndDelete = mutableMapOf>() - relatedHolds.values.forEach { a -> a.values.forEach { b -> totalRelated.getAndAdd(b.relatedMap.size.toLong()) } } - logger.info { "Processing $totalRelated total related assets in a second pass." } - batchMap.entries.parallelStream().forEach { entry: MutableMap.MutableEntry -> - val threadId = entry.key - val batch = entry.value - val relatedBatch = relatedMap[threadId]!! - batch.flush() - batch.created.forEach { asset -> - created[asset.guid] = asset - } - totalCreates.getAndAdd(batch.created.size.toLong()) - totalUpdates.getAndAdd(batch.updated.size.toLong()) - totalSkipped.getAndAdd(batch.skipped.size.toLong()) - someFailure = someFailure || batch.failures.isNotEmpty() - logFailures(batch, logger, totalFailures) - for (hold in relatedHolds[threadId]!!) { - val placeholderGuid = hold.key - val relatedAssetHold = hold.value - val resolvedGuid = batch.resolvedGuids[placeholderGuid] - val resolvedAsset = relatedAssetHold.fromAsset.toBuilder().guid(resolvedGuid).build() as Asset - rowToAsset.batchRelated(resolvedAsset, relatedAssetHold.relatedMap, relatedBatch, count, totalRelated, logger, batchSize) - } - for (delete in deferDeletes[threadId]!!) { - val placeholderGuid = delete.key - val resolvedGuid = batch.resolvedGuids[placeholderGuid]!! - searchAndDelete[resolvedGuid] = delete.value - } - } - logger.info { "Total assets created: $totalCreates" } - logger.info { "Total assets updated: $totalUpdates" } - logger.info { "Total assets skipped: $totalSkipped" } - logger.info { "Total assets failed : $totalFailures" } - - // Step 3: final-flush the deferred related assets - val totalCreatesR = AtomicLong(0) - val totalUpdatesR = AtomicLong(0) - val totalFailuresR = AtomicLong(0) - relatedMap.values.parallelStream().forEach { b -> - b.flush() - totalCreatesR.getAndAdd(b.created.size.toLong()) - totalUpdatesR.getAndAdd(b.updated.size.toLong()) - someFailure = someFailure || b.failures.isNotEmpty() - logFailures(b, logger, totalFailuresR) - } - logger.info { "Total related assets created: $totalCreatesR" } - logger.info { "Total related assets updated: $totalUpdatesR" } - logger.info { "Total related assets failed : $totalFailuresR" } - - // Step 4: bulk-delete any related assets marked for removal - val totalToScan = searchAndDelete.size.toLong() - val totalScanned = AtomicLong(0) - val totalDeleted = AtomicLong(0) - logger.info { "Scanning $totalToScan total assets in a final pass for possible README removal." } - searchAndDelete.entries.parallelStream().forEach { entry -> - val guid = entry.key - val fields = entry.value - Atlan.getDefaultClient().assets.select() - .where(Asset.GUID.eq(guid)) - .includesOnResults(fields) - .stream() - .forEach { result -> - val guids = mutableListOf() - for (field in fields) { - val getter = ReflectionCache.getGetter(result.javaClass, field.atlanFieldName) - val reference = getter.invoke(result) - if (reference is Asset) { - guids.add(reference.guid) - } else if (reference != null && Collection::class.java.isAssignableFrom(reference.javaClass)) { - for (element in reference as Collection<*>) { - if (element is Asset) { - guids.add(element.guid) - } - } - } - } - if (guids.isNotEmpty()) { - val response = Atlan.getDefaultClient().assets.delete(guids, AtlanDeleteType.SOFT) - totalDeleted.getAndAdd(response.deletedAssets.size.toLong()) - } - } - Utils.logProgress(totalScanned, totalToScan, logger, batchSize) - } - logger.info { "Total READMEs deleted: $totalDeleted" } - return someFailure - } - - private fun logFailures(b: AssetBatch, logger: KLogger, totalFailures: AtomicLong) { - if (b.failures.isNotEmpty()) { - for (f in b.failures) { - logger.info { "Failed batch reason: ${f.failureReason}" } - totalFailures.getAndAdd(f.failedAssets.size.toLong()) - for (failed in f.failedAssets) { - logger.info { - " ... included asset: ${failed.typeName}::${failed.qualifiedName}" - } - } - } - } - } - - /** {@inheritDoc} */ - @Throws(IOException::class) - override fun close() { - reader.close() - } - - data class RelatedAssetHold( - val fromAsset: Asset, - val relatedMap: Map>, - ) -} diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/CategoryImporter.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/CategoryImporter.kt index 6343cd421e..50ebd02e6b 100644 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/CategoryImporter.kt +++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/CategoryImporter.kt @@ -5,12 +5,12 @@ package com.atlan.pkg.aim import com.atlan.model.assets.GlossaryCategory import com.atlan.model.fields.AtlanField import com.atlan.pkg.cache.CategoryCache +import com.atlan.pkg.serde.RowDeserializer import com.atlan.pkg.serde.cell.GlossaryCategoryXformer.CATEGORY_DELIMITER import com.atlan.pkg.serde.cell.GlossaryXformer.GLOSSARY_DELIMITER import mu.KotlinLogging import java.util.concurrent.atomic.AtomicInteger import kotlin.math.max -import kotlin.system.exitProcess /** * Import categories (only) into Atlan from a provided CSV file. @@ -46,23 +46,14 @@ class CategoryImporter( private val maxCategoryDepth = AtomicInteger(1) /** {@inheritDoc} */ - override fun import() { + override fun import(columnsToSkip: Set) { cache.preload() // Import categories by level, top-to-bottom, and stop when we hit a level with no categories logger.info { "Loading categories in multiple passes, by level..." } while (levelToProcess < maxCategoryDepth.get()) { levelToProcess += 1 logger.info { "--- Loading level $levelToProcess categories... ---" } - CSVReader(filename, updateOnly).use { csv -> - val start = System.currentTimeMillis() - val anyFailures = csv.streamRows(this, batchSize, logger) - logger.info { "Total time taken: ${System.currentTimeMillis() - start} ms" } - if (anyFailures) { - logger.error { "Some errors detected, failing the workflow." } - exitProcess(1) - } - cacheCreated(csv.created) - } + super.import(columnsToSkip) } } @@ -99,17 +90,18 @@ class CategoryImporter( } /** {@inheritDoc} */ - override fun getCacheId(row: List, header: List): String { - val nameIdx = header.indexOf(GlossaryCategory.NAME.atlanFieldName) - val parentIdx = header.indexOf(GlossaryCategory.PARENT_CATEGORY.atlanFieldName) - val anchorIdx = header.indexOf(GlossaryCategory.ANCHOR.atlanFieldName) - return if (nameIdx >= 0 && parentIdx >= 0 && anchorIdx >= 0) { - val glossaryName = row[anchorIdx] - val categoryPath = if (row[parentIdx].isBlank()) { - row[nameIdx] + override fun getCacheId(deserializer: RowDeserializer): String { + val glossaryIdx = deserializer.heading.indexOf(GlossaryCategory.ANCHOR.atlanFieldName) + val parentCategory = deserializer.getValue(GlossaryCategory.PARENT_CATEGORY.atlanFieldName)?.let { it as GlossaryCategory } + val categoryName = deserializer.getValue(GlossaryCategory.NAME.atlanFieldName)?.let { it as String } ?: "" + return if (glossaryIdx >= 0 && categoryName.isNotBlank()) { + val glossaryName = deserializer.row[glossaryIdx].ifBlank { "" } + val categoryPath = if (parentCategory == null) { + categoryName } else { - val parentPath = row[parentIdx].split(CATEGORY_DELIMITER)[0] - "$parentPath$CATEGORY_DELIMITER${row[nameIdx]}" + val parentIdx = deserializer.heading.indexOf(GlossaryCategory.PARENT_CATEGORY.atlanFieldName) + val parentPath = deserializer.row[parentIdx].split(CATEGORY_DELIMITER)[0] + "$parentPath$CATEGORY_DELIMITER$categoryName" } "$categoryPath$GLOSSARY_DELIMITER$glossaryName" } else { diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/GTCImporter.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/GTCImporter.kt index c0e5ea0d38..78568bb6a8 100644 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/GTCImporter.kt +++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/GTCImporter.kt @@ -4,12 +4,11 @@ package com.atlan.pkg.aim import com.atlan.model.assets.Asset import com.atlan.model.fields.AtlanField -import com.atlan.pkg.aim.Importer.clearField import com.atlan.pkg.cache.AssetCache -import com.atlan.pkg.serde.RowDeserialization +import com.atlan.pkg.serde.FieldSerde import com.atlan.pkg.serde.RowDeserializer +import com.atlan.pkg.serde.csv.CSVImporter import mu.KLogger -import kotlin.system.exitProcess /** * Import glossaries, terms and categories (only) into Atlan from a provided CSV file. @@ -28,37 +27,27 @@ import kotlin.system.exitProcess * @param logger through which to log any problems */ abstract class GTCImporter( - private val filename: String, - private val attrsToOverwrite: List, - private val updateOnly: Boolean, - private val batchSize: Int, + filename: String, + attrsToOverwrite: List, + updateOnly: Boolean, + batchSize: Int, protected val cache: AssetCache, - protected val typeNameFilter: String, - protected val logger: KLogger, -) : AssetGenerator { - /** - * Actually run the import. - */ - open fun import() { - cache.preload() - CSVReader(filename, updateOnly).use { csv -> - val start = System.currentTimeMillis() - val anyFailures = csv.streamRows(this, batchSize, logger) - logger.info { "Total time taken: ${System.currentTimeMillis() - start} ms" } - if (anyFailures) { - logger.error { "Some errors detected, failing the workflow." } - exitProcess(1) - } - cacheCreated(csv.created) - } - } - + typeNameFilter: String, + logger: KLogger, +) : CSVImporter( + filename, + logger, + typeNameFilter, + attrsToOverwrite, + updateOnly = updateOnly, + batchSize = batchSize, +) { /** * Cache any created assets. * * @param map from GUID to asset that was created */ - fun cacheCreated(map: Map) { + override fun cacheCreated(map: Map) { // Cache any assets that were created by processing map.keys.forEach { k -> // We must look up the asset and then cache to ensure we have the necessary identity @@ -70,81 +59,32 @@ abstract class GTCImporter( } } - /** - * Translate a row of CSV values into a term object, overwriting any attributes that were empty - * in the CSV with blank values, per the job configuration. - * - * @param row of values in the CSV - * @param header names of columns (and their position) in the header of the CSV - * @param typeIdx numeric index of the column containing the typeName of the asset in the row - * @param qnIdx numeric index of the column containing the qualifiedName of the asset in the row - * @param skipColumns columns to skip, i.e. that need to be processed in a later pass - * @return the deserialized asset object(s) - */ - override fun buildFromRow(row: List, header: List, typeIdx: Int, qnIdx: Int, skipColumns: Set): RowDeserialization? { - // Deserialize the objects represented in that row (could be more than one due to flattening - // of in particular things like READMEs and Links) - if (includeRow(row, header, typeIdx, qnIdx)) { - val revisedRow = generateQualifiedName(row, header, typeIdx, qnIdx) - val assets = RowDeserializer( - heading = header, - row = revisedRow, - typeIdx = typeIdx, - qnIdx = qnIdx, - logger = logger, - skipColumns = skipColumns, - ).getAssets() - if (assets != null) { - val builder = assets.primary - val candidate = builder.build() - val identity = RowDeserialization.AssetIdentity(candidate.typeName, candidate.qualifiedName) - // Then apply any field clearances based on attributes configured in the job - for (field in attrsToOverwrite) { - clearField(field, candidate, builder) - // If there are no related assets - if (!assets.related.containsKey(field.atlanFieldName)) { - assets.delete.add(field) - } - } - return RowDeserialization(identity, builder, assets.related, assets.delete) - } - } - return null + /** {@inheritDoc} */ + override fun getBuilder(deserializer: RowDeserializer): Asset.AssetBuilder<*, *> { + val qualifiedName = generateQualifiedName(deserializer) + return FieldSerde.getBuilderForType(typeNameFilter) + .qualifiedName(qualifiedName) } /** * Calculate a fallback qualifiedName, if the qualifiedName value in this row is empty. * - * @param row of values - * @param header column names - * @param typeIdx index of the typeName - * @param qnIdx index of the qualifiedName - * @return the original row of data with a qualifiedName filled in, if it was blank to begin with + * @param deserializer a row of deserialized values + * @return the qualifiedName, calculated from the deserialized values */ - fun generateQualifiedName(row: List, header: List, typeIdx: Int, qnIdx: Int): List { - val revised = mutableListOf() - for (i in row.indices) { - when { - i == qnIdx -> { - revised.add( - row[i].ifBlank { - val cacheId = getCacheId(row, header) - cache.getByIdentity(cacheId)?.qualifiedName ?: cacheId - }, - ) - } - else -> revised.add(row[i]) - } + private fun generateQualifiedName(deserializer: RowDeserializer): String { + val qn = deserializer.getValue(Asset.QUALIFIED_NAME.atlanFieldName)?.let { it as String } ?: "" + return qn.ifBlank { + val cacheId = getCacheId(deserializer) + cache.getByIdentity(cacheId)?.qualifiedName ?: cacheId } - return revised } /** * Calculate the cache identity for this row of the CSV, based purely on the information in the CSV. * - * @param row of values - * @param header column names + * @param deserializer a row of deserialized values * @return the cache identity for the row */ - abstract fun getCacheId(row: List, header: List): String + abstract fun getCacheId(deserializer: RowDeserializer): String } diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/GlossaryImporter.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/GlossaryImporter.kt index fdf829c257..748d017d79 100644 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/GlossaryImporter.kt +++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/GlossaryImporter.kt @@ -5,6 +5,7 @@ package com.atlan.pkg.aim import com.atlan.model.assets.Glossary import com.atlan.model.fields.AtlanField import com.atlan.pkg.cache.GlossaryCache +import com.atlan.pkg.serde.RowDeserializer import mu.KotlinLogging /** @@ -35,17 +36,7 @@ class GlossaryImporter( logger = KotlinLogging.logger {}, ) { /** {@inheritDoc} */ - override fun includeRow(row: List, header: List, typeIdx: Int, qnIdx: Int): Boolean { - return row[typeIdx] == typeNameFilter - } - - /** {@inheritDoc} */ - override fun getCacheId(row: List, header: List): String { - val nameIdx = header.indexOf(Glossary.NAME.atlanFieldName) - return if (nameIdx >= 0) { - row[nameIdx] - } else { - "" - } + override fun getCacheId(deserializer: RowDeserializer): String { + return deserializer.getValue(Glossary.NAME.atlanFieldName)?.let { it as String } ?: "" } } diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt index 7fa1757935..f204122379 100644 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt +++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt @@ -3,16 +3,11 @@ package com.atlan.pkg.aim import AssetImportCfg -import com.atlan.cache.ReflectionCache -import com.atlan.model.assets.Asset -import com.atlan.model.fields.AtlanField -import com.atlan.model.fields.SearchableField import com.atlan.pkg.Utils import com.atlan.pkg.cache.LinkCache import com.atlan.pkg.serde.FieldSerde -import com.atlan.serde.Serde +import com.atlan.pkg.serde.csv.CSVImporter.Companion.attributesToClear import mu.KotlinLogging -import java.lang.reflect.InvocationTargetException import kotlin.system.exitProcess /** @@ -30,10 +25,10 @@ object Importer { val assetsFilename = Utils.getOrDefault(config.assetsFile, "") val glossariesFilename = Utils.getOrDefault(config.glossariesFile, "") val assetAttrsToOverwrite = - attributesToClear(Utils.getOrDefault(config.assetsAttrToOverwrite, listOf()).toMutableList(), "assets") + attributesToClear(Utils.getOrDefault(config.assetsAttrToOverwrite, listOf()).toMutableList(), "assets", logger) val assetsFailOnErrors = Utils.getOrDefault(config.assetsFailOnErrors, true) val glossaryAttrsToOverwrite = - attributesToClear(Utils.getOrDefault(config.glossariesAttrToOverwrite, listOf()).toMutableList(), "glossaries") + attributesToClear(Utils.getOrDefault(config.glossariesAttrToOverwrite, listOf()).toMutableList(), "glossaries", logger) val assetsUpdateOnly = Utils.getOrDefault(config.assetsUpsertSemantic, "update") == "update" val glossariesUpdateOnly = Utils.getOrDefault(config.glossariesUpsertSemantic, "update") == "update" val glossariesFailOnErrors = Utils.getOrDefault(config.glossariesFailOnErrors, true) @@ -67,75 +62,4 @@ object Importer { assetImporter.import() } } - - /** - * Determine which (if any) attributes should be cleared (removed) if they are empty in the input file. - * - * @param attrNames the list of attribute names provided through the configuration - * @param fileInfo a descriptor to qualify for which file the attributes are being set - * @return parsed list of attribute names to be cleared - */ - private fun attributesToClear(attrNames: MutableList, fileInfo: String): List { - if (attrNames.contains(Asset.CERTIFICATE_STATUS.atlanFieldName)) { - attrNames.add(Asset.CERTIFICATE_STATUS_MESSAGE.atlanFieldName) - } - if (attrNames.contains(Asset.ANNOUNCEMENT_TYPE.atlanFieldName)) { - attrNames.add(Asset.ANNOUNCEMENT_TITLE.atlanFieldName) - attrNames.add(Asset.ANNOUNCEMENT_MESSAGE.atlanFieldName) - } - logger.info { "Adding attributes to be cleared, if blank (for $fileInfo): $attrNames" } - val attrFields = mutableListOf() - for (name in attrNames) { - attrFields.add(SearchableField(name, name)) - } - return attrFields - } - - /** - * Check if the provided field should be cleared, and if so clear it. - * - * @param field to check if it is empty and should be cleared - * @param candidate the asset on which to check whether the field is empty (or not) - * @param builder the builder against which to clear the field - * @return true if the field was cleared, false otherwise - */ - internal fun clearField(field: AtlanField, candidate: Asset, builder: Asset.AssetBuilder<*, *>): Boolean { - try { - val getter = ReflectionCache.getGetter( - Serde.getAssetClassForType(candidate.typeName), - field.atlanFieldName, - ) - val value = getter.invoke(candidate) - if (value == null || - (Collection::class.java.isAssignableFrom(value.javaClass) && (value as Collection<*>).isEmpty()) - ) { - builder.nullField(field.atlanFieldName) - return true - } - } catch (e: ClassNotFoundException) { - logger.error( - "Unknown type {} — cannot clear {}.", - candidate.typeName, - field.atlanFieldName, - e, - ) - } catch (e: IllegalAccessException) { - logger.error( - "Unable to clear {} on: {}::{}", - field.atlanFieldName, - candidate.typeName, - candidate.qualifiedName, - e, - ) - } catch (e: InvocationTargetException) { - logger.error( - "Unable to clear {} on: {}::{}", - field.atlanFieldName, - candidate.typeName, - candidate.qualifiedName, - e, - ) - } - return false - } } diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/TermImporter.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/TermImporter.kt index 8be32692e8..48260db6e6 100644 --- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/TermImporter.kt +++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/TermImporter.kt @@ -2,13 +2,14 @@ Copyright 2023 Atlan Pte. Ltd. */ package com.atlan.pkg.aim +import com.atlan.model.assets.GlossaryCategory import com.atlan.model.assets.GlossaryTerm import com.atlan.model.fields.AtlanField import com.atlan.pkg.cache.TermCache +import com.atlan.pkg.serde.RowDeserializer import com.atlan.pkg.serde.cell.GlossaryTermXformer import com.atlan.pkg.serde.cell.GlossaryXformer import mu.KotlinLogging -import kotlin.system.exitProcess /** * Import glossaries (only) into Atlan from a provided CSV file. @@ -44,47 +45,28 @@ class TermImporter( ) /** {@inheritDoc} */ - override fun import() { + override fun import(columnsToSkip: Set) { cache.preload() // Import categories by level, top-to-bottom, and stop when we hit a level with no categories logger.info { "--- Loading terms in first pass, without term-to-term relationships... ---" } - CSVReader(filename, updateOnly).use { csv -> - val start = System.currentTimeMillis() - val anyFailures = csv.streamRows(this, batchSize, logger, GlossaryTermXformer.TERM_TO_TERM_FIELDS) - logger.info { "Total time taken: ${System.currentTimeMillis() - start} ms" } - if (anyFailures) { - logger.error { "Some errors detected, failing the workflow." } - exitProcess(1) - } - cacheCreated(csv.created) - } + super.import(GlossaryTermXformer.TERM_TO_TERM_FIELDS) // In this second pass we need to ignore fields that were loaded in the first pass, // or we will end up with duplicates (links) or extra audit log messages (tags, README) logger.info { "--- Loading term-to-term relationships (second pass)... ---" } - CSVReader(filename, updateOnly).use { csv -> - val start = System.currentTimeMillis() - val anyFailures = csv.streamRows(this, batchSize, logger, secondPassIgnore) - logger.info { "Total time taken: ${System.currentTimeMillis() - start} ms" } - if (anyFailures) { - logger.error { "Some errors detected, failing the workflow." } - exitProcess(1) - } - } + super.import(secondPassIgnore) } /** {@inheritDoc} */ - override fun includeRow(row: List, header: List, typeIdx: Int, qnIdx: Int): Boolean { - return row[typeIdx] == typeNameFilter - } - - /** {@inheritDoc} */ - override fun getCacheId(row: List, header: List): String { - val nameIdx = header.indexOf(GlossaryTerm.NAME.atlanFieldName) - val anchorIdx = header.indexOf(GlossaryTerm.ANCHOR.atlanFieldName) - return if (nameIdx >= 0 && anchorIdx >= 0) { - val glossaryName = row[anchorIdx] - val termName = row[nameIdx] - "$termName${GlossaryXformer.GLOSSARY_DELIMITER}$glossaryName" + override fun getCacheId(deserializer: RowDeserializer): String { + val glossaryIdx = deserializer.heading.indexOf(GlossaryCategory.ANCHOR.atlanFieldName) + val termName = deserializer.getValue(GlossaryTerm.NAME.atlanFieldName)?.let { it as String } ?: "" + return if (glossaryIdx >= 0) { + val glossaryName = deserializer.row[glossaryIdx].ifBlank { "" } + if (glossaryName.isNotBlank() && termName.isNotBlank()) { + "$termName${GlossaryXformer.GLOSSARY_DELIMITER}$glossaryName" + } else { + "" + } } else { "" } diff --git a/samples/packages/duplicate-detector/src/main/kotlin/DuplicateDetector.kt b/samples/packages/duplicate-detector/src/main/kotlin/DuplicateDetector.kt index 4b733deae2..3a85ab6e8d 100644 --- a/samples/packages/duplicate-detector/src/main/kotlin/DuplicateDetector.kt +++ b/samples/packages/duplicate-detector/src/main/kotlin/DuplicateDetector.kt @@ -14,6 +14,7 @@ import com.atlan.model.enums.CertificateStatus import com.atlan.model.search.CompoundQuery import com.atlan.pkg.Utils import com.atlan.util.AssetBatch +import com.atlan.util.ParallelBatch import mu.KotlinLogging import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong @@ -126,7 +127,7 @@ object DuplicateDetector { val keys = hashToAssetKeys[hash] if (keys?.size!! > 1) { val columns = hashToColumns[hash] - val batch = AssetBatch( + val batch = ParallelBatch( Atlan.getDefaultClient(), batchSize, false, diff --git a/samples/packages/relational-assets-builder/build.gradle.kts b/samples/packages/relational-assets-builder/build.gradle.kts index a41ebb5eca..95bba03964 100644 --- a/samples/packages/relational-assets-builder/build.gradle.kts +++ b/samples/packages/relational-assets-builder/build.gradle.kts @@ -1,28 +1,4 @@ -val jarPath = "$rootDir/jars" - +/* SPDX-License-Identifier: Apache-2.0 */ plugins { id("com.atlan.kotlin-custom-package") - alias(libs.plugins.shadow) -} - -dependencies { - implementation("de.siegmar:fastcsv:2.2.2") -} - -tasks { - shadowJar { - isZip64 = true - destinationDirectory.set(file(jarPath)) - dependencies { - include(dependency("de.siegmar:fastcsv:.*")) - } - mergeServiceFiles() - } - - jar { - // Override the default jar task so we get the shadowed jar - // as the only jar output - actions = listOf() - doLast { shadowJar } - } } diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetImporter.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetImporter.kt index 67014aeb78..b92d7d2ac8 100644 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetImporter.kt +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/AssetImporter.kt @@ -12,11 +12,8 @@ import com.atlan.model.assets.Schema import com.atlan.model.assets.Table import com.atlan.model.assets.View import com.atlan.model.fields.AtlanField -import com.atlan.pkg.rab.Importer.clearField -import com.atlan.pkg.serde.RowDeserialization -import com.atlan.pkg.serde.RowDeserializer +import com.atlan.pkg.serde.csv.CSVImporter import mu.KLogger -import kotlin.system.exitProcess /** * Import assets into Atlan from a provided CSV file. @@ -36,20 +33,32 @@ abstract class AssetImporter( private val attrsToOverwrite: List, private val updateOnly: Boolean, private val batchSize: Int, - protected val typeNameFilter: String, - protected val logger: KLogger, -) : AssetGenerator { - // Can skip all of these columns when deserializing a row as they will be set by - // the creator methods anyway - private val skipColumns = setOf( - Asset.CONNECTION_NAME.atlanFieldName, - ConnectionImporter.CONNECTOR_TYPE, - ISQL.DATABASE_NAME.atlanFieldName, - ISQL.SCHEMA_NAME.atlanFieldName, - ENTITY_NAME, - ColumnImporter.COLUMN_PARENT_QN, - Column.ORDER.atlanFieldName, - ) + typeNameFilter: String, + logger: KLogger, +) : CSVImporter( + filename, + logger, + typeNameFilter, + attrsToOverwrite, + batchSize = batchSize, +) { + + /** {@inheritDoc} */ + override fun import(columnsToSkip: Set) { + // Can skip all of these columns when deserializing a row as they will be set by + // the creator methods anyway + super.import( + setOf( + Asset.CONNECTION_NAME.atlanFieldName, + ConnectionImporter.CONNECTOR_TYPE, + ISQL.DATABASE_NAME.atlanFieldName, + ISQL.SCHEMA_NAME.atlanFieldName, + ENTITY_NAME, + ColumnImporter.COLUMN_PARENT_QN, + Column.ORDER.atlanFieldName, + ), + ) + } companion object { const val ENTITY_NAME = "entityName" @@ -105,67 +114,6 @@ abstract class AssetImporter( } } - /** - * Actually run the import. - */ - fun import() { - CSVReader(filename, updateOnly).use { csv -> - val start = System.currentTimeMillis() - val anyFailures = csv.streamRows(this, batchSize, logger, skipColumns) - logger.info { "Total time taken: ${System.currentTimeMillis() - start} ms" } - if (anyFailures) { - logger.error { "Some errors detected, failing the workflow." } - exitProcess(1) - } - cacheCreated(csv.created) - } - } - - /** - * Translate a row of CSV values into an asset object, overwriting any attributes that were empty - * in the CSV with blank values, per the job configuration. - * - * @param row of values in the CSV - * @param header names of columns (and their position) in the header of the CSV - * @param typeIdx index of the typeName column - * @param skipColumns columns to skip, i.e. that need to be processed in a later pass - * @return the deserialized asset object(s) - */ - override fun buildFromRow(row: List, header: List, typeIdx: Int, skipColumns: Set): RowDeserialization? { - // Deserialize the objects represented in that row (could be more than one due to flattening - // of in particular things like READMEs and Links) - if (includeRow(row, header, typeIdx)) { - val deserializer = RowDeserializer( - heading = header, - row = row, - typeName = typeNameFilter, - logger = logger, - skipColumns = skipColumns, - ) - val assets = deserializer.getAssets(getBuilder(deserializer)) - if (assets != null) { - val builder = assets.primary - val candidate = builder.build() - val identity = RowDeserialization.AssetIdentity(candidate.typeName, candidate.qualifiedName) - // Then apply any field clearances based on attributes configured in the job - for (field in attrsToOverwrite) { - clearField(field, candidate, builder) - // If there are no related assets - if (!assets.related.containsKey(field.atlanFieldName)) { - assets.delete.add(field) - } - } - return RowDeserialization(identity, builder, assets.related, assets.delete) - } - } - return null - } - - /** {@inheritDoc} */ - override fun includeRow(row: List, header: List, typeIdx: Int): Boolean { - return row[typeIdx] == typeNameFilter - } - data class QualifiedNameDetails( val uniqueQN: String, val partialQN: String, diff --git a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt index b077272e68..d268539206 100644 --- a/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt +++ b/samples/packages/relational-assets-builder/src/main/kotlin/com/atlan/pkg/rab/Importer.kt @@ -3,7 +3,6 @@ package com.atlan.pkg.rab import RelationalAssetsBuilderCfg -import com.atlan.cache.ReflectionCache import com.atlan.model.assets.Asset import com.atlan.model.assets.Column import com.atlan.model.assets.Connection @@ -12,19 +11,16 @@ import com.atlan.model.assets.MaterializedView import com.atlan.model.assets.Schema import com.atlan.model.assets.Table import com.atlan.model.assets.View -import com.atlan.model.fields.AtlanField -import com.atlan.model.fields.SearchableField import com.atlan.pkg.Utils import com.atlan.pkg.cache.ConnectionCache import com.atlan.pkg.cache.LinkCache import com.atlan.pkg.cache.TermCache import com.atlan.pkg.rab.AssetImporter.Companion.getQualifiedNameDetails import com.atlan.pkg.serde.FieldSerde -import com.atlan.serde.Serde +import com.atlan.pkg.serde.csv.CSVImporter import de.siegmar.fastcsv.reader.CsvReader import de.siegmar.fastcsv.writer.CsvWriter import mu.KotlinLogging -import java.lang.reflect.InvocationTargetException import java.nio.file.Paths import java.nio.file.StandardOpenOption import java.util.concurrent.atomic.AtomicInteger @@ -44,7 +40,7 @@ object Importer { val batchSize = 20 val assetsFilename = Utils.getOrDefault(config.assetsFile, "") val assetAttrsToOverwrite = - attributesToClear(Utils.getOrDefault(config.assetsAttrToOverwrite, listOf()).toMutableList(), "assets") + CSVImporter.attributesToClear(Utils.getOrDefault(config.assetsAttrToOverwrite, listOf()).toMutableList(), "assets", logger) val assetsFailOnErrors = Utils.getOrDefault(config.assetsFailOnErrors, true) val assetsUpdateOnly = Utils.getOrDefault(config.assetsUpsertSemantic, "update") == "update" @@ -100,77 +96,6 @@ object Importer { columnImporter.import() } - /** - * Determine which (if any) attributes should be cleared (removed) if they are empty in the input file. - * - * @param attrNames the list of attribute names provided through the configuration - * @param fileInfo a descriptor to qualify for which file the attributes are being set - * @return parsed list of attribute names to be cleared - */ - private fun attributesToClear(attrNames: MutableList, fileInfo: String): List { - if (attrNames.contains(Asset.CERTIFICATE_STATUS.atlanFieldName)) { - attrNames.add(Asset.CERTIFICATE_STATUS_MESSAGE.atlanFieldName) - } - if (attrNames.contains(Asset.ANNOUNCEMENT_TYPE.atlanFieldName)) { - attrNames.add(Asset.ANNOUNCEMENT_TITLE.atlanFieldName) - attrNames.add(Asset.ANNOUNCEMENT_MESSAGE.atlanFieldName) - } - logger.info { "Adding attributes to be cleared, if blank (for $fileInfo): $attrNames" } - val attrFields = mutableListOf() - for (name in attrNames) { - attrFields.add(SearchableField(name, name)) - } - return attrFields - } - - /** - * Check if the provided field should be cleared, and if so clear it. - * - * @param field to check if it is empty and should be cleared - * @param candidate the asset on which to check whether the field is empty (or not) - * @param builder the builder against which to clear the field - * @return true if the field was cleared, false otherwise - */ - internal fun clearField(field: AtlanField, candidate: Asset, builder: Asset.AssetBuilder<*, *>): Boolean { - try { - val getter = ReflectionCache.getGetter( - Serde.getAssetClassForType(candidate.typeName), - field.atlanFieldName, - ) - val value = getter.invoke(candidate) - if (value == null || - (Collection::class.java.isAssignableFrom(value.javaClass) && (value as Collection<*>).isEmpty()) - ) { - builder.nullField(field.atlanFieldName) - return true - } - } catch (e: ClassNotFoundException) { - logger.error( - "Unknown type {} — cannot clear {}.", - candidate.typeName, - field.atlanFieldName, - e, - ) - } catch (e: IllegalAccessException) { - logger.error( - "Unable to clear {} on: {}::{}", - field.atlanFieldName, - candidate.typeName, - candidate.qualifiedName, - e, - ) - } catch (e: InvocationTargetException) { - logger.error( - "Unable to clear {} on: {}::{}", - field.atlanFieldName, - candidate.typeName, - candidate.qualifiedName, - e, - ) - } - return false - } - private fun preprocessCSV(originalFile: String): PreprocessedCsv { // Setup val fieldSeparator = ',' diff --git a/sdk/src/main/java/com/atlan/util/ParallelBatch.java b/sdk/src/main/java/com/atlan/util/ParallelBatch.java new file mode 100644 index 0000000000..93086adb7b --- /dev/null +++ b/sdk/src/main/java/com/atlan/util/ParallelBatch.java @@ -0,0 +1,214 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.util; + +import com.atlan.AtlanClient; +import com.atlan.exception.AtlanException; +import com.atlan.model.assets.Asset; +import com.atlan.model.core.AssetMutationResponse; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ParallelBatch { + + private final AtlanClient client; + private final int maxSize; + private final boolean replaceAtlanTags; + private final AssetBatch.CustomMetadataHandling customMetadataHandling; + private final boolean captureFailures; + private final boolean updateOnly; + private final Map batchMap; + + private List created = null; + private List updated = null; + private List failures = null; + private List skipped = null; + private Map resolvedGuids = null; + + /** + * Create a new batch of assets to be bulk-saved, in parallel (across threads). + * + * @param client connectivity to Atlan + * @param maxSize maximum size of each batch that should be processed (per API call) + */ + public ParallelBatch(AtlanClient client, int maxSize) { + this(client, maxSize, false, AssetBatch.CustomMetadataHandling.IGNORE); + } + + /** + * Create a new batch of assets to be bulk-saved, in parallel (across threads). + * + * @param client connectivity to Atlan + * @param maxSize maximum size of each batch that should be processed (per API call) + * @param replaceAtlanTags if true, all Atlan tags on an existing asset will be overwritten; if false, all Atlan tags will be ignored + * @param customMetadataHandling how to handle custom metadata (ignore it, replace it (wiping out anything pre-existing), or merge it) + */ + public ParallelBatch( + AtlanClient client, + int maxSize, + boolean replaceAtlanTags, + AssetBatch.CustomMetadataHandling customMetadataHandling) { + this(client, maxSize, replaceAtlanTags, customMetadataHandling, false); + } + + /** + * Create a new batch of assets to be bulk-saved, in parallel (across threads). + * + * @param client connectivity to Atlan + * @param maxSize maximum size of each batch that should be processed (per API call) + * @param replaceAtlanTags if true, all Atlan tags on an existing asset will be overwritten; if false, all Atlan tags will be ignored + * @param customMetadataHandling how to handle custom metadata (ignore it, replace it (wiping out anything pre-existing), or merge it) + * @param captureFailures when true, any failed batches will be captured and retained rather than exceptions being raised (for large amounts of processing this could cause memory issues!) + */ + public ParallelBatch( + AtlanClient client, + int maxSize, + boolean replaceAtlanTags, + AssetBatch.CustomMetadataHandling customMetadataHandling, + boolean captureFailures) { + this(client, maxSize, replaceAtlanTags, customMetadataHandling, captureFailures, false); + } + + /** + * Create a new batch of assets to be bulk-saved, in parallel (across threads). + * + * @param client connectivity to Atlan + * @param maxSize maximum size of each batch that should be processed (per API call) + * @param replaceAtlanTags if true, all Atlan tags on an existing asset will be overwritten; if false, all Atlan tags will be ignored + * @param customMetadataHandling how to handle custom metadata (ignore it, replace it (wiping out anything pre-existing), or merge it) + * @param captureFailures when true, any failed batches will be captured and retained rather than exceptions being raised (for large amounts of processing this could cause memory issues!) + * @param updateOnly when true, only attempt to update existing assets and do not create any assets (note: this will incur a performance penalty) + */ + public ParallelBatch( + AtlanClient client, + int maxSize, + boolean replaceAtlanTags, + AssetBatch.CustomMetadataHandling customMetadataHandling, + boolean captureFailures, + boolean updateOnly) { + this.client = client; + this.maxSize = maxSize; + this.replaceAtlanTags = replaceAtlanTags; + this.customMetadataHandling = customMetadataHandling; + this.captureFailures = captureFailures; + this.updateOnly = updateOnly; + this.batchMap = new ConcurrentHashMap<>(); + } + + /** + * Add an asset to the batch to be processed. + * + * @param single the asset to add to a batch + * @return the assets that were created or updated in this batch, or null if the batch is still queued + * @throws AtlanException on any problems adding the asset to or processing the batch + */ + public AssetMutationResponse add(Asset single) throws AtlanException { + long id = Thread.currentThread().getId(); + if (!batchMap.containsKey(id)) { + batchMap.put( + id, + new AssetBatch( + client, maxSize, replaceAtlanTags, customMetadataHandling, captureFailures, updateOnly)); + } + return batchMap.get(id).add(single); + } + + /** + * Flush any remaining assets in the parallel batches. + * + * @throws IllegalStateException on any problems flushing (submitting) any of the parallel batches + */ + public void flush() throws AtlanException { + batchMap.values().parallelStream().forEach(batch -> { + try { + batch.flush(); + } catch (AtlanException e) { + throw new IllegalStateException(e); + } + }); + } + + /** + * Assets that were created (minimal info only). + * + * @return all created assets, across all parallel batches + */ + public List getCreated() { + if (created == null) { + List list = new ArrayList<>(); + for (AssetBatch batch : batchMap.values()) { + list.addAll(batch.getCreated()); + } + created = Collections.unmodifiableList(list); + } + return created; + } + + /** + * Assets that were updated (minimal info only). + * + * @return all updated assets, across all parallel batches + */ + public List getUpdated() { + if (updated == null) { + List list = new ArrayList<>(); + for (AssetBatch batch : batchMap.values()) { + list.addAll(batch.getUpdated()); + } + updated = Collections.unmodifiableList(list); + } + return updated; + } + + /** + * Batches that failed to be committed (only populated when captureFailures is set to true). + * + * @return all batches that failed, across all parallel batches + */ + public List getFailures() { + if (failures == null) { + List list = new ArrayList<>(); + for (AssetBatch batch : batchMap.values()) { + list.addAll(batch.getFailures()); + } + failures = Collections.unmodifiableList(list); + } + return failures; + } + + /** + * Assets that were skipped, when updateOnly is requested and the asset does not exist in Atlan. + * + * @return all assets that were skipped, across all parallel batches + */ + public List getSkipped() { + if (skipped == null) { + List list = new ArrayList<>(); + for (AssetBatch batch : batchMap.values()) { + list.addAll(batch.getSkipped()); + } + skipped = Collections.unmodifiableList(list); + } + return skipped; + } + + /** + * Map from placeholder GUID to resolved (actual) GUID, for all assets that were processed through the batch. + * + * @return all resolved GUIDs, across all parallel batches + */ + public Map getResolvedGuids() { + if (resolvedGuids == null) { + Map map = new HashMap<>(); + for (AssetBatch batch : batchMap.values()) { + map.putAll(batch.getResolvedGuids()); + } + resolvedGuids = Collections.unmodifiableMap(map); + } + return resolvedGuids; + } +} diff --git a/sdk/src/test/resources/logback-test.xml b/sdk/src/test/resources/logback-test.xml deleted file mode 100644 index 8b2eab1266..0000000000 --- a/sdk/src/test/resources/logback-test.xml +++ /dev/null @@ -1,12 +0,0 @@ - - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} [ReqId:%X{X-Atlan-Request-Id}] - %msg%n - - - - - -