From eb6edaa5a5d811c8e330eceb763da27e70b4e5bd Mon Sep 17 00:00:00 2001
From: Christopher Grote <cmgrote@users.noreply.github.com>
Date: Mon, 27 Jan 2025 14:26:33 +0000
Subject: [PATCH 1/6] Initial work for delta processing in asset import

Signed-off-by: Christopher Grote <cmgrote@users.noreply.github.com>
---
 .../com/atlan/pkg/util/DeltaProcessor.kt      | 10 +++
 .../src/main/kotlin/AssetImportCfg.kt         |  4 +
 .../kotlin/com/atlan/pkg/aim/AssetImporter.kt | 77 +++++++++++++++++--
 .../main/kotlin/com/atlan/pkg/aim/Importer.kt | 50 +++++++++---
 .../src/main/resources/package.pkl            | 44 +++++++++++
 5 files changed, 165 insertions(+), 20 deletions(-)

diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/DeltaProcessor.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/DeltaProcessor.kt
index cf8fa48841..92836eabaa 100644
--- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/DeltaProcessor.kt
+++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/DeltaProcessor.kt
@@ -62,6 +62,14 @@ class DeltaProcessor(
      */
     fun calculate() {
         if (semantic == "full") {
+            if (preprocessedDetails.multipleConnections) {
+                throw IllegalStateException(
+                    """
+                    Assets in multiple connections detected in the input file.
+                    Full delta processing currently only works for a single connection per input file, exiting.
+                    """.trimIndent(),
+                )
+            }
             if (qualifiedNamePrefix.isNullOrBlank()) {
                 logger.warn { "Unable to determine qualifiedName prefix, cannot calculate any delta." }
             } else {
@@ -219,12 +227,14 @@ class DeltaProcessor(
      * @param hasLinks whether there are any links in the input file
      * @param hasTermAssignments whether there are any term assignments in the input file
      * @param preprocessedFile full path to the preprocessed input file
+     * @param multipleConnections whether multiple connections were present in the input file (true) or only a single connection (false)
      */
     open class Results(
         val assetRootName: String,
         hasLinks: Boolean,
         hasTermAssignments: Boolean,
         val preprocessedFile: String,
+        val multipleConnections: Boolean = false,
     ) : RowPreprocessor.Results(
             hasLinks = hasLinks,
             hasTermAssignments = hasTermAssignments,
diff --git a/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt b/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt
index 98677f7d46..b7f097151f 100644
--- a/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt
+++ b/samples/packages/asset-import/src/main/kotlin/AssetImportCfg.kt
@@ -20,6 +20,10 @@ data class AssetImportCfg(
     @JsonProperty("assets_prefix") val assetsPrefix: String = "",
     @JsonProperty("assets_key") val assetsKey: String = "",
     @JsonProperty("assets_upsert_semantic") val assetsUpsertSemantic: String = "update",
+    @JsonProperty("assets_delta_semantic") val assetsDeltaSemantic: String = "delta",
+    @JsonProperty("assets_delta_removal_type") val assetsDeltaRemovalType: String = "archive",
+    @JsonProperty("assets_delta_reload_calculation") val assetsDeltaReloadCalculation: String = "all",
+    @JsonProperty("assets_previous_file_direct") val assetsPreviousFileDirect: String = "",
     @JsonProperty("assets_config") val assetsConfig: String? = null,
     @JsonDeserialize(using = WidgetSerde.MultiSelectDeserializer::class)
     @JsonSerialize(using = WidgetSerde.MultiSelectSerializer::class)
diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt
index 9f3f4e32f7..064a9f34d7 100644
--- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt
+++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt
@@ -201,8 +201,13 @@ import com.atlan.pkg.serde.csv.CSVImporter
 import com.atlan.pkg.serde.csv.CSVPreprocessor
 import com.atlan.pkg.serde.csv.CSVXformer
 import com.atlan.pkg.serde.csv.ImportResults
-import com.atlan.pkg.serde.csv.RowPreprocessor
+import com.atlan.pkg.util.AssetResolver
+import com.atlan.pkg.util.AssetResolver.QualifiedNameDetails
+import com.atlan.pkg.util.DeltaProcessor
+import com.atlan.util.AssetBatch.AssetIdentity
+import com.atlan.util.StringUtils
 import mu.KLogger
+import java.io.IOException
 
 /**
  * Import assets into Atlan from a provided CSV file.
@@ -236,6 +241,8 @@ class AssetImporter(
     ) {
     private var header = emptyList<String>()
     private var typeToProcess = ""
+    private val connectionQNs = mutableSetOf<String>()
+    private val deltaProcessing = ctx.config.assetsDeltaSemantic == "full"
     private val cyclicalRelationships = mutableMapOf<String, MutableSet<RelationshipEnds>>()
     private val mapToSecondPass = mutableMapOf<String, MutableSet<String>>()
     private val secondPassRemain =
@@ -256,7 +263,7 @@ class AssetImporter(
     override fun preprocess(
         outputFile: String?,
         outputHeaders: List<String>?,
-    ): RowPreprocessor.Results {
+    ): DeltaProcessor.Results {
         // Retrieve all relationships and filter to any cyclical relationships
         // (meaning relationships where both ends are of the same type)
         val typeDefs = ctx.client.typeDefs.list(AtlanTypeCategory.RELATIONSHIP)
@@ -265,7 +272,13 @@ class AssetImporter(
             .filter { it.endDef1.type == it.endDef2.type }
             .forEach { cyclicalRelationships.getOrPut(it.endDef1.type) { mutableSetOf() }.add(RelationshipEnds(it.name, it.endDef1.name, it.endDef2.name)) }
         val results = super.preprocess(outputFile, outputHeaders)
-        return results
+        return DeltaProcessor.Results(
+            assetRootName = if (connectionQNs.isNotEmpty()) connectionQNs.first() else NO_CONNECTION_QN,
+            hasLinks = results.hasLinks,
+            hasTermAssignments = results.hasTermAssignments,
+            preprocessedFile = results.outputFile ?: filename,
+            multipleConnections = connectionQNs.size > 1,
+        )
     }
 
     /** {@inheritDoc} */
@@ -298,6 +311,11 @@ class AssetImporter(
                 mapToSecondPass.getOrPut(typeName) { mutableSetOf() }.add(two)
             }
         }
+        if (deltaProcessing) {
+            val qualifiedName = CSVXformer.trimWhitespace(row.getOrNull(header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName)) ?: "")
+            val connectionQNFromAsset = StringUtils.getConnectionQualifiedName(qualifiedName)
+            connectionQNs.add(connectionQNFromAsset)
+        }
         return row
     }
 
@@ -405,7 +423,8 @@ class AssetImporter(
         val types: List<String>,
     )
 
-    companion object {
+    companion object : AssetResolver {
+        const val NO_CONNECTION_QN = "NO_CONNECTION_FOUND"
         private val ordering =
             listOf(
                 TypeGrouping(
@@ -814,12 +833,42 @@ class AssetImporter(
             types.sortedBy { t ->
                 ordering.flatMap { it.types }.indexOf(t).takeIf { it >= 0 } ?: Int.MAX_VALUE
             }
+
+        /** {@inheritDoc} */
+        override fun resolveAsset(
+            values: List<String>,
+            header: List<String>,
+            connectionsMap: Map<AssetResolver.ConnectionIdentity, String>,
+        ): AssetIdentity {
+            val typeIdx = header.indexOf(Asset.TYPE_NAME.atlanFieldName)
+            if (typeIdx < 0) {
+                throw IOException(
+                    "Unable to find the column 'typeName'. This is a mandatory column in the input CSV.",
+                )
+            }
+            val qnIdx = header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName)
+            if (qnIdx < 0) {
+                throw IOException(
+                    "Unable to find the column 'qualifiedName'. This is a mandatory column in the input CSV.",
+                )
+            }
+            val typeName = CSVXformer.trimWhitespace(values[typeIdx])
+            val qualifiedName = CSVXformer.trimWhitespace(values[qnIdx])
+            return AssetIdentity(typeName, qualifiedName)
+        }
+
+        /** {@inheritDoc} */
+        override fun getQualifiedNameDetails(
+            row: List<String>,
+            header: List<String>,
+            typeName: String,
+        ): QualifiedNameDetails = throw IllegalStateException("This method should never be called. Please raise an issue if you discover this in any log file.")
     }
 
     /** Pre-process the assets import file. */
     private fun preprocess(): Results = Preprocessor(filename, fieldSeparator, logger).preprocess<Results>()
 
-    private class Preprocessor(
+    class Preprocessor(
         originalFile: String,
         fieldSeparator: Char,
         logger: KLogger,
@@ -829,6 +878,7 @@ class AssetImporter(
             fieldSeparator = fieldSeparator,
         ) {
         private val typesInFile = mutableSetOf<String>()
+        private var connectionQNs = mutableSetOf<String>()
 
         /** {@inheritDoc} */
         override fun preprocessRow(
@@ -842,6 +892,9 @@ class AssetImporter(
             if (typeName.isNotBlank()) {
                 typesInFile.add(row[typeIdx])
             }
+            val qualifiedName = CSVXformer.trimWhitespace(row.getOrNull(header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName)) ?: "")
+            val connectionQNFromAsset = StringUtils.getConnectionQualifiedName(qualifiedName)
+            connectionQNs.add(connectionQNFromAsset)
             return row
         }
 
@@ -849,23 +902,31 @@ class AssetImporter(
         override fun finalize(
             header: List<String>,
             outputFile: String?,
-        ): RowPreprocessor.Results {
+        ): DeltaProcessor.Results {
             val results = super.finalize(header, outputFile)
             return Results(
+                connectionQN = if (connectionQNs.isNotEmpty()) connectionQNs.first() else NO_CONNECTION_QN,
+                multipleConnections = connectionQNs.size > 1,
                 hasLinks = results.hasLinks,
                 hasTermAssignments = results.hasTermAssignments,
+                outputFile = outputFile ?: filename,
                 typesInFile = typesInFile,
             )
         }
     }
 
     private class Results(
+        connectionQN: String,
+        multipleConnections: Boolean,
         hasLinks: Boolean,
         hasTermAssignments: Boolean,
+        outputFile: String,
         val typesInFile: Set<String>,
-    ) : RowPreprocessor.Results(
+    ) : DeltaProcessor.Results(
+            assetRootName = connectionQN,
             hasLinks = hasLinks,
             hasTermAssignments = hasTermAssignments,
-            outputFile = null,
+            multipleConnections = multipleConnections,
+            preprocessedFile = outputFile,
         )
 }
diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
index 09de7ce78b..85a1d8c594 100644
--- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
+++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
@@ -7,6 +7,7 @@ import com.atlan.pkg.PackageContext
 import com.atlan.pkg.Utils
 import com.atlan.pkg.serde.FieldSerde
 import com.atlan.pkg.serde.csv.ImportResults
+import com.atlan.pkg.util.DeltaProcessor
 import kotlin.system.exitProcess
 
 /**
@@ -16,6 +17,8 @@ import kotlin.system.exitProcess
 object Importer {
     private val logger = Utils.getLogger(this.javaClass.name)
 
+    private const val PREVIOUS_FILES_PREFIX = "csa-asset-import"
+
     @JvmStatic
     fun main(args: Array<String>) {
         val outputDirectory = if (args.isEmpty()) "tmp" else args[0]
@@ -77,13 +80,44 @@ object Importer {
                         ctx.config.assetsKey,
                     )
                 FieldSerde.FAIL_ON_ERRORS.set(ctx.config.assetsFailOnErrors)
-                logger.info { "=== Importing assets... ===" }
+                val previousFileDirect = ctx.config.assetsPreviousFileDirect
                 val assetImporter = AssetImporter(ctx, assetsInput, logger)
-                val includes = assetImporter.preprocess()
-                if (includes.hasLinks) {
+                val preprocessedDetails = assetImporter.preprocess()
+                if (preprocessedDetails.hasLinks) {
                     ctx.linkCache.preload()
                 }
-                assetImporter.import()
+                DeltaProcessor(
+                    ctx = ctx,
+                    semantic = ctx.config.assetsDeltaSemantic,
+                    qualifiedNamePrefix = preprocessedDetails.assetRootName,
+                    removalType = ctx.config.assetsDeltaRemovalType,
+                    previousFilesPrefix = PREVIOUS_FILES_PREFIX,
+                    resolver = AssetImporter,
+                    preprocessedDetails = preprocessedDetails,
+                    typesToRemove = emptyList(),
+                    logger = logger,
+                    reloadSemantic = ctx.config.assetsDeltaReloadCalculation,
+                    previousFilePreprocessor =
+                        AssetImporter.Preprocessor(
+                            previousFileDirect,
+                            ctx.config.assetsFieldSeparator[0],
+                            logger,
+                        ),
+                    outputDirectory = outputDirectory,
+                ).use { delta ->
+
+                    delta.calculate()
+
+                    logger.info { "=== Importing assets... ===" }
+                    val importedAssets = assetImporter.import()
+
+                    delta.processDeletions()
+
+                    ImportResults.getAllModifiedAssets(ctx.client, true, importedAssets).use { modifiedAssets ->
+                        delta.updateConnectionCache(modifiedAssets)
+                    }
+                    importedAssets
+                }
             } else {
                 null
             }
@@ -116,14 +150,6 @@ object Importer {
             } else {
                 null
             }
-
-        ImportResults.getAllModifiedAssets(ctx.client, false, resultsAssets).use { allModified ->
-            Utils.updateConnectionCache(
-                client = ctx.client,
-                added = allModified,
-                fallback = outputDirectory,
-            )
-        }
         return ImportResults.combineAll(ctx.client, true, resultsGTC, resultsDDP, resultsAssets)
     }
 }
diff --git a/samples/packages/asset-import/src/main/resources/package.pkl b/samples/packages/asset-import/src/main/resources/package.pkl
index c224ceaefa..5e4d22d996 100644
--- a/samples/packages/asset-import/src/main/resources/package.pkl
+++ b/samples/packages/asset-import/src/main/resources/package.pkl
@@ -92,6 +92,46 @@ uiConfig {
           helpText = "Whether to allow the creation of new assets from the input CSV (full or partial assets), or ensure assets are only updated if they already exist in Atlan."
           fallback = default
         }
+        ["assets_delta_semantic"] = new Radio {
+          title = "Delta handling"
+          required = false
+          possibleValues {
+            ["full"] = "Full replacement"
+            ["delta"] = "Incremental"
+          }
+          default = "delta"
+          helpText = "Whether to treat the input file as an initial load, full replacement (deleting any existing assets not in the file) or only incremental (no deletion of existing assets)."
+          fallback = default
+        }
+        ["assets_delta_removal_type"] = new Radio {
+          title = "Removal type"
+          required = false
+          possibleValues {
+            ["archive"] = "Archive (recoverable)"
+            ["purge"] = "Purge (cannot be recovered)"
+          }
+          default = "archive"
+          helpText = "How to delete any assets not found in the latest file."
+          fallback = default
+        }
+        ["assets_delta_reload_calculation"] = new Radio {
+          title = "Reload which assets"
+          required = false
+          possibleValues {
+            ["all"] = "All assets"
+            ["changes"] = "Changed assets only"
+          }
+          default = "all"
+          helpText = "Which assets to reload from the latest input CSV file. Changed assets only will calculate which assets have changed between the files and only attempt to reload those changes."
+          fallback = default
+        }
+        ["assets_previous_file_direct"] = new TextInput {
+          title = "Previous file"
+          required = false
+          hide = true
+          helpText = "Path to a direct file (locally) to use for delta processing."
+          fallback = ""
+        }
         ["assets_config"] = new Radio {
           title = "Options"
           required = true
@@ -353,6 +393,10 @@ uiConfig {
       whenInputs { ["import_type"] = "CLOUD" }
       required { "cloud_source" "assets_prefix" "assets_key" "glossaries_prefix" "glossaries_key" "data_products_prefix" "data_products_key" }
     }
+    new UIRule {
+      whenInputs { ["assets_delta_semantic"] = "full" }
+      required { "assets_delta_removal_type" "assets_delta_reload_calculation" }
+    }
     new UIRule {
       whenInputs { ["assets_config"] = "advanced" }
       required {

From 13bdf810054f61256d700eff32e8f407892b3064 Mon Sep 17 00:00:00 2001
From: Christopher Grote <cmgrote@users.noreply.github.com>
Date: Mon, 27 Jan 2025 15:09:39 +0000
Subject: [PATCH 2/6] Fixes coordination of off-heap caches for delta
 processing

Signed-off-by: Christopher Grote <cmgrote@users.noreply.github.com>
---
 .../src/main/kotlin/com/atlan/pkg/aim/Importer.kt             | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
index 85a1d8c594..33ad58df3f 100644
--- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
+++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
@@ -113,7 +113,9 @@ object Importer {
 
                     delta.processDeletions()
 
-                    ImportResults.getAllModifiedAssets(ctx.client, true, importedAssets).use { modifiedAssets ->
+                    // Note: we won't close the original set of changes here, as we'll combine it later for a full set of changes
+                    // (at which point, it will be closed)
+                    ImportResults.getAllModifiedAssets(ctx.client, false, importedAssets).use { modifiedAssets ->
                         delta.updateConnectionCache(modifiedAssets)
                     }
                     importedAssets

From 259ffbb7dbf566de24c39d5fa644d94011a2b868 Mon Sep 17 00:00:00 2001
From: Christopher Grote <cmgrote@users.noreply.github.com>
Date: Mon, 27 Jan 2025 15:42:44 +0000
Subject: [PATCH 3/6] Embed delta processing into the asset importer

Signed-off-by: Christopher Grote <cmgrote@users.noreply.github.com>
---
 .../kotlin/com/atlan/pkg/aim/AssetImporter.kt | 43 +++++++++----------
 .../main/kotlin/com/atlan/pkg/aim/Importer.kt |  8 +++-
 2 files changed, 27 insertions(+), 24 deletions(-)

diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt
index 064a9f34d7..94efac5b30 100644
--- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt
+++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/AssetImporter.kt
@@ -201,6 +201,7 @@ import com.atlan.pkg.serde.csv.CSVImporter
 import com.atlan.pkg.serde.csv.CSVPreprocessor
 import com.atlan.pkg.serde.csv.CSVXformer
 import com.atlan.pkg.serde.csv.ImportResults
+import com.atlan.pkg.serde.csv.RowPreprocessor
 import com.atlan.pkg.util.AssetResolver
 import com.atlan.pkg.util.AssetResolver.QualifiedNameDetails
 import com.atlan.pkg.util.DeltaProcessor
@@ -218,11 +219,13 @@ import java.io.IOException
  * asset in Atlan, then add that column's field to getAttributesToOverwrite.
  *
  * @param ctx context in which the package is running
+ * @param delta the processor containing any details about file deltas
  * @param filename name of the file to import
  * @param logger through which to write log entries
  */
 class AssetImporter(
     ctx: PackageContext<AssetImportCfg>,
+    private val delta: DeltaProcessor?,
     filename: String,
     logger: KLogger,
 ) : CSVImporter(
@@ -241,8 +244,6 @@ class AssetImporter(
     ) {
     private var header = emptyList<String>()
     private var typeToProcess = ""
-    private val connectionQNs = mutableSetOf<String>()
-    private val deltaProcessing = ctx.config.assetsDeltaSemantic == "full"
     private val cyclicalRelationships = mutableMapOf<String, MutableSet<RelationshipEnds>>()
     private val mapToSecondPass = mutableMapOf<String, MutableSet<String>>()
     private val secondPassRemain =
@@ -263,7 +264,7 @@ class AssetImporter(
     override fun preprocess(
         outputFile: String?,
         outputHeaders: List<String>?,
-    ): DeltaProcessor.Results {
+    ): RowPreprocessor.Results {
         // Retrieve all relationships and filter to any cyclical relationships
         // (meaning relationships where both ends are of the same type)
         val typeDefs = ctx.client.typeDefs.list(AtlanTypeCategory.RELATIONSHIP)
@@ -271,14 +272,7 @@ class AssetImporter(
             .stream()
             .filter { it.endDef1.type == it.endDef2.type }
             .forEach { cyclicalRelationships.getOrPut(it.endDef1.type) { mutableSetOf() }.add(RelationshipEnds(it.name, it.endDef1.name, it.endDef2.name)) }
-        val results = super.preprocess(outputFile, outputHeaders)
-        return DeltaProcessor.Results(
-            assetRootName = if (connectionQNs.isNotEmpty()) connectionQNs.first() else NO_CONNECTION_QN,
-            hasLinks = results.hasLinks,
-            hasTermAssignments = results.hasTermAssignments,
-            preprocessedFile = results.outputFile ?: filename,
-            multipleConnections = connectionQNs.size > 1,
-        )
+        return super.preprocess(outputFile, outputHeaders)
     }
 
     /** {@inheritDoc} */
@@ -311,11 +305,6 @@ class AssetImporter(
                 mapToSecondPass.getOrPut(typeName) { mutableSetOf() }.add(two)
             }
         }
-        if (deltaProcessing) {
-            val qualifiedName = CSVXformer.trimWhitespace(row.getOrNull(header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName)) ?: "")
-            val connectionQNFromAsset = StringUtils.getConnectionQualifiedName(qualifiedName)
-            connectionQNs.add(connectionQNFromAsset)
-        }
         return row
     }
 
@@ -409,12 +398,22 @@ class AssetImporter(
         typeIdx: Int,
         qnIdx: Int,
     ): Boolean {
-        return if (updateOnly) {
-            // If we are only updating, process in-parallel, in any order
-            row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }).isNotBlank()
+        val candidateRow =
+            if (updateOnly) {
+                // If we are only updating, process in-parallel, in any order
+                row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }).isNotBlank()
+            } else {
+                // If we are doing more than only updates, process the assets in top-down order
+                row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }) == typeToProcess
+            }
+        // Only proceed processing this candidate row if we're doing non-delta processing, or we have
+        // detected that it needs to be loaded via the delta processing
+        return if (candidateRow) {
+            delta?.resolveAsset(row, header)?.let { identity ->
+                delta.reloadAsset(identity)
+            } ?: true
         } else {
-            // If we are doing more than only updates, process the assets in top-down order
-            return row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }) == typeToProcess
+            false
         }
     }
 
@@ -915,7 +914,7 @@ class AssetImporter(
         }
     }
 
-    private class Results(
+    class Results(
         connectionQN: String,
         multipleConnections: Boolean,
         hasLinks: Boolean,
diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
index 33ad58df3f..057c5c30a1 100644
--- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
+++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
@@ -81,8 +81,10 @@ object Importer {
                     )
                 FieldSerde.FAIL_ON_ERRORS.set(ctx.config.assetsFailOnErrors)
                 val previousFileDirect = ctx.config.assetsPreviousFileDirect
-                val assetImporter = AssetImporter(ctx, assetsInput, logger)
-                val preprocessedDetails = assetImporter.preprocess()
+                val preprocessedDetails =
+                    AssetImporter
+                        .Preprocessor(assetsInput, ctx.config.assetsFieldSeparator[0], logger)
+                        .preprocess<AssetImporter.Results>()
                 if (preprocessedDetails.hasLinks) {
                     ctx.linkCache.preload()
                 }
@@ -109,6 +111,8 @@ object Importer {
                     delta.calculate()
 
                     logger.info { "=== Importing assets... ===" }
+                    val assetImporter = AssetImporter(ctx, delta, assetsInput, logger)
+                    assetImporter.preprocess() // Note: we still do this to detect any cyclical relationships
                     val importedAssets = assetImporter.import()
 
                     delta.processDeletions()

From 44a1b03cacd6e4ebe9cefea9db8fb9e91ec8b9c8 Mon Sep 17 00:00:00 2001
From: Christopher Grote <cmgrote@users.noreply.github.com>
Date: Mon, 27 Jan 2025 20:12:08 +0000
Subject: [PATCH 4/6] Adds initial delta-based tests

Signed-off-by: Christopher Grote <cmgrote@users.noreply.github.com>
---
 .../kotlin/com/atlan/pkg/aim/DeltaTest.kt     | 139 ++++++++++++++++++
 .../atlan/pkg/aim/MultiConnectionDeltaTest.kt |  82 +++++++++++
 .../src/test/resources/assets.csv             |   5 +
 .../src/test/resources/assets_latest.csv      |   5 +
 .../test/resources/multi-connection-delta.csv |   3 +
 5 files changed, 234 insertions(+)
 create mode 100644 samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/DeltaTest.kt
 create mode 100644 samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/MultiConnectionDeltaTest.kt
 create mode 100644 samples/packages/asset-import/src/test/resources/assets.csv
 create mode 100644 samples/packages/asset-import/src/test/resources/assets_latest.csv
 create mode 100644 samples/packages/asset-import/src/test/resources/multi-connection-delta.csv

diff --git a/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/DeltaTest.kt b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/DeltaTest.kt
new file mode 100644
index 0000000000..46b5941e23
--- /dev/null
+++ b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/DeltaTest.kt
@@ -0,0 +1,139 @@
+/* SPDX-License-Identifier: Apache-2.0
+   Copyright 2023 Atlan Pte. Ltd. */
+package com.atlan.pkg.aim
+
+import com.atlan.model.assets.MaterializedView
+import com.atlan.model.assets.Schema
+import com.atlan.model.assets.View
+import com.atlan.model.enums.AtlanConnectorType
+import com.atlan.pkg.PackageTest
+import com.atlan.pkg.Utils
+import com.atlan.pkg.util.AssetResolver
+import com.atlan.pkg.util.FileBasedDelta
+import org.testng.Assert.assertTrue
+import java.io.File
+import java.nio.file.Paths
+import kotlin.test.Test
+import kotlin.test.assertEquals
+
+/**
+ * Test pre-processing of full-load CSV files to detect which assets should be removed.
+ */
+class DeltaTest : PackageTest("aid") {
+    override val logger = Utils.getLogger(this.javaClass.name)
+
+    private val conn1 = makeUnique("c1")
+    private val conn1Type = AtlanConnectorType.ICEBERG
+    private val conn1QN = "default/${conn1Type.value}/1234567890"
+
+    private val previousFile = "assets.csv"
+    private val currentFile = "assets_latest.csv"
+    private var delta: FileBasedDelta? = null
+
+    private val files =
+        listOf(
+            previousFile,
+            currentFile,
+            "debug.log",
+        )
+
+    private fun prepFile() {
+        // Prepare a copy of the file with unique names for connections
+        val previousIn = Paths.get("src", "test", "resources", previousFile).toFile()
+        val previousOut = Paths.get(testDirectory, previousFile).toFile()
+        replaceVars(previousIn, previousOut)
+        val latestIn = Paths.get("src", "test", "resources", currentFile).toFile()
+        val latestOut = Paths.get(testDirectory, currentFile).toFile()
+        replaceVars(latestIn, latestOut)
+    }
+
+    private fun replaceVars(
+        input: File,
+        output: File,
+    ) {
+        input.useLines { lines ->
+            lines.forEach { line ->
+                val revised =
+                    line
+                        .replace("{{CONNECTION}}", conn1QN)
+                output.appendText("$revised\n")
+            }
+        }
+    }
+
+    override fun setup() {
+        prepFile()
+        val connectionsMap =
+            mapOf(
+                AssetResolver.ConnectionIdentity(conn1, conn1Type.value) to conn1QN,
+            )
+        delta = FileBasedDelta(connectionsMap, AssetImporter, Utils.getLogger(this.javaClass.name), compareChecksums = true)
+        delta!!.calculateDelta(
+            Paths.get(testDirectory, currentFile).toString(),
+            Paths.get(testDirectory, previousFile).toString(),
+        )
+    }
+
+    @Test
+    fun hasSomethingToDelete() {
+        assertTrue(delta!!.hasAnythingToDelete())
+    }
+
+    @Test
+    fun totalAssetsToDelete() {
+        assertEquals(1, delta!!.assetsToDelete.size)
+        val types =
+            delta!!
+                .assetsToDelete
+                .entrySet()
+                .map { it.key.typeName }
+                .toList()
+                .toSet()
+        assertEquals(1, types.size)
+        assertTrue(types.contains(View.TYPE_NAME))
+    }
+
+    @Test
+    fun specificAssetsToDelete() {
+        delta!!.assetsToDelete.entrySet().forEach {
+            when (it.key.typeName) {
+                View.TYPE_NAME -> assertTrue("$conn1QN/DB/SCH/VIEW" == it.key.qualifiedName)
+            }
+        }
+    }
+
+    @Test
+    fun totalAssetsToReload() {
+        assertEquals(2, delta!!.assetsToReload.size)
+        val types =
+            delta!!
+                .assetsToReload
+                .entrySet()
+                .map { it.key.typeName }
+                .toList()
+                .toSet()
+        assertEquals(2, types.size)
+        assertTrue(types.contains(Schema.TYPE_NAME))
+        assertTrue(types.contains(MaterializedView.TYPE_NAME))
+    }
+
+    @Test
+    fun specificAssetsToReload() {
+        delta!!.assetsToReload.entrySet().forEach {
+            when (it.key.typeName) {
+                Schema.TYPE_NAME -> assertEquals("$conn1QN/DB/SCH", it.key.qualifiedName)
+                MaterializedView.TYPE_NAME -> assertEquals("$conn1QN/DB/SCH/MVIEW", it.key.qualifiedName)
+            }
+        }
+    }
+
+    @Test
+    fun filesCreated() {
+        validateFilesExist(files)
+    }
+
+    @Test
+    fun errorFreeLog() {
+        validateErrorFreeLog()
+    }
+}
diff --git a/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/MultiConnectionDeltaTest.kt b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/MultiConnectionDeltaTest.kt
new file mode 100644
index 0000000000..a2f3d0458a
--- /dev/null
+++ b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/MultiConnectionDeltaTest.kt
@@ -0,0 +1,82 @@
+/* SPDX-License-Identifier: Apache-2.0
+   Copyright 2023 Atlan Pte. Ltd. */
+package com.atlan.pkg.aim
+
+import AssetImportCfg
+import com.atlan.model.assets.Connection
+import com.atlan.model.enums.AtlanConnectorType
+import com.atlan.pkg.PackageTest
+import com.atlan.pkg.Utils
+import java.nio.file.Paths
+import kotlin.test.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertFailsWith
+
+/**
+ * Test import of an invalid tag value.
+ */
+class MultiConnectionDeltaTest : PackageTest("mcd") {
+    override val logger = Utils.getLogger(this.javaClass.name)
+
+    private val table = makeUnique("t1")
+
+    private val testFile = "multi-connection-delta.csv"
+
+    private val files =
+        listOf(
+            testFile,
+            "debug.log",
+        )
+
+    private fun prepFile(
+        connectionQN1: String,
+        connectionQN2: String,
+    ) {
+        val input = Paths.get("src", "test", "resources", testFile).toFile()
+        val output = Paths.get(testDirectory, testFile).toFile()
+        input.useLines { lines ->
+            lines.forEach { line ->
+                val revised =
+                    line
+                        .replace("{{CONNECTION1}}", connectionQN1)
+                        .replace("{{CONNECTION2}}", connectionQN2)
+                        .replace("{{TABLE}}", table)
+                output.appendText("$revised\n")
+            }
+        }
+    }
+
+    override fun setup() {
+        val snowflakeConnection = Connection.findByName(client, "production", AtlanConnectorType.SNOWFLAKE)?.get(0)!!
+        val mssqlConnection = Connection.findByName(client, "production", AtlanConnectorType.MSSQL)?.get(0)!!
+        prepFile(snowflakeConnection.qualifiedName, mssqlConnection.qualifiedName)
+    }
+
+    @Test
+    fun failsWithMeaningfulError() {
+        val exception =
+            assertFailsWith<IllegalStateException> {
+                runCustomPackage(
+                    AssetImportCfg(
+                        assetsFile = Paths.get(testDirectory, testFile).toString(),
+                        assetsUpsertSemantic = "upsert",
+                        assetsFailOnErrors = true,
+                        assetsDeltaSemantic = "full",
+                    ),
+                    Importer::main,
+                )
+            }
+        assertEquals(
+            """
+            Assets in multiple connections detected in the input file.
+            Full delta processing currently only works for a single connection per input file, exiting.
+            """.trimIndent(),
+            exception.message,
+        )
+    }
+
+    @Test
+    fun filesCreated() {
+        validateFilesExist(files)
+    }
+}
diff --git a/samples/packages/asset-import/src/test/resources/assets.csv b/samples/packages/asset-import/src/test/resources/assets.csv
new file mode 100644
index 0000000000..72a404f75f
--- /dev/null
+++ b/samples/packages/asset-import/src/test/resources/assets.csv
@@ -0,0 +1,5 @@
+qualifiedName,typeName,name,connectorType,connectionQualifiedName,databaseQualifiedName,databaseName,schemaQualifiedName,schemaName,description
+{{CONNECTION}}/DB,Database,DB,iceberg,{{CONNECTION}},,,,,
+{{CONNECTION}}/DB/SCH,Schema,SCH,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,,,A schema
+{{CONNECTION}}/DB/SCH/TBL,Table,TBL,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,
+{{CONNECTION}}/DB/SCH/VIEW,View,VIEW,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,
diff --git a/samples/packages/asset-import/src/test/resources/assets_latest.csv b/samples/packages/asset-import/src/test/resources/assets_latest.csv
new file mode 100644
index 0000000000..c6b99574bb
--- /dev/null
+++ b/samples/packages/asset-import/src/test/resources/assets_latest.csv
@@ -0,0 +1,5 @@
+qualifiedName,typeName,name,connectorType,connectionQualifiedName,databaseQualifiedName,databaseName,schemaQualifiedName,schemaName,description
+{{CONNECTION}}/DB,Database,DB,iceberg,{{CONNECTION}},,,,,
+{{CONNECTION}}/DB/SCH,Schema,SCH,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,,,A revised schema
+{{CONNECTION}}/DB/SCH/TBL,Table,TBL,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,
+{{CONNECTION}}/DB/SCH/MVIEW,MaterialisedView,MVIEW,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,
diff --git a/samples/packages/asset-import/src/test/resources/multi-connection-delta.csv b/samples/packages/asset-import/src/test/resources/multi-connection-delta.csv
new file mode 100644
index 0000000000..a4fec09832
--- /dev/null
+++ b/samples/packages/asset-import/src/test/resources/multi-connection-delta.csv
@@ -0,0 +1,3 @@
+qualifiedName,typeName,name,displayName,description,userDescription,ownerUsers,ownerGroups,certificateStatus,certificateStatusMessage,announcementType,announcementTitle,announcementMessage,assignedTerms,atlanTags,links,readme,starredDetails,connectorType,connectionQualifiedName,databaseQualifiedName,databaseName,schemaQualifiedName,schemaName
+{{CONNECTION1}}/ANALYTICS/WIDE_WORLD_IMPORTERS/{{TABLE}},Table,{{TABLE}},,,,,,VERIFIED,,,,,,NON_EXISTENT_TAG,,,,snowflake,{{CONNECTION1}},{{CONNECTION1}}/ANALYTICS,ANALYTICS,{{CONNECTION1}}/ANALYTICS/WIDE_WORLD_IMPORTERS,WIDE_WORLD_IMPORTERS
+{{CONNECTION2}}/ANALYTICS/WIDE_WORLD_IMPORTERS/{{TABLE}},Table,{{TABLE}},,,,,,VERIFIED,,,,,,NON_EXISTENT_TAG,,,,snowflake,{{CONNECTION2}},{{CONNECTION2}}/ANALYTICS,ANALYTICS,{{CONNECTION2}}/ANALYTICS/WIDE_WORLD_IMPORTERS,WIDE_WORLD_IMPORTERS

From 7b35a6cb7ad58b729311aa3dd02f26bc186823f1 Mon Sep 17 00:00:00 2001
From: Christopher Grote <cmgrote@users.noreply.github.com>
Date: Mon, 27 Jan 2025 21:31:49 +0000
Subject: [PATCH 5/6] Adds delta integration test

Signed-off-by: Christopher Grote <cmgrote@users.noreply.github.com>
---
 .../com/atlan/pkg/util/FileBasedDelta.kt      |   6 +-
 .../main/kotlin/com/atlan/pkg/aim/Importer.kt |   2 +-
 .../atlan/pkg/aim/CreateThenUpDeltaAIMTest.kt | 419 ++++++++++++++++++
 .../src/test/resources/assets.csv             |  10 +-
 .../src/test/resources/assets_latest.csv      |  10 +-
 5 files changed, 435 insertions(+), 12 deletions(-)
 create mode 100644 samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/CreateThenUpDeltaAIMTest.kt

diff --git a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/FileBasedDelta.kt b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/FileBasedDelta.kt
index 1ab6892baa..a408775f06 100644
--- a/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/FileBasedDelta.kt
+++ b/package-toolkit/runtime/src/main/kotlin/com/atlan/pkg/util/FileBasedDelta.kt
@@ -271,7 +271,11 @@ class FileBasedDelta(
             val deletionType = if (purge) AtlanDeleteType.PURGE else AtlanDeleteType.SOFT
             val guidList = guidsToDeleteToDetails.entrySet()
             val totalToDelete = guidsToDeleteToDetails.size
-            logger.info { " --- Deleting ($deletionType) $totalToDelete assets across $removeTypes... ---" }
+            if (removeTypes.isNotEmpty()) {
+                logger.info { " --- Deleting ($deletionType) $totalToDelete assets (limited to types: $removeTypes)... ---" }
+            } else {
+                logger.info { " --- Deleting ($deletionType) $totalToDelete assets... ---" }
+            }
             val currentCount = AtomicLong(0)
             if (totalToDelete < DELETION_BATCH) {
                 if (totalToDelete > 0) {
diff --git a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
index 057c5c30a1..343224da5d 100644
--- a/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
+++ b/samples/packages/asset-import/src/main/kotlin/com/atlan/pkg/aim/Importer.kt
@@ -17,7 +17,7 @@ import kotlin.system.exitProcess
 object Importer {
     private val logger = Utils.getLogger(this.javaClass.name)
 
-    private const val PREVIOUS_FILES_PREFIX = "csa-asset-import"
+    const val PREVIOUS_FILES_PREFIX = "csa-asset-import"
 
     @JvmStatic
     fun main(args: Array<String>) {
diff --git a/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/CreateThenUpDeltaAIMTest.kt b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/CreateThenUpDeltaAIMTest.kt
new file mode 100644
index 0000000000..cb63bd0316
--- /dev/null
+++ b/samples/packages/asset-import/src/test/kotlin/com/atlan/pkg/aim/CreateThenUpDeltaAIMTest.kt
@@ -0,0 +1,419 @@
+/* SPDX-License-Identifier: Apache-2.0
+   Copyright 2023 Atlan Pte. Ltd. */
+package com.atlan.pkg.aim
+
+import AssetImportCfg
+import com.atlan.model.assets.Asset
+import com.atlan.model.assets.Connection
+import com.atlan.model.assets.Database
+import com.atlan.model.assets.MaterializedView
+import com.atlan.model.assets.Schema
+import com.atlan.model.assets.Table
+import com.atlan.model.assets.View
+import com.atlan.model.enums.AtlanConnectorType
+import com.atlan.model.enums.AtlanStatus
+import com.atlan.model.fields.AtlanField
+import com.atlan.pkg.PackageTest
+import com.atlan.pkg.Utils
+import com.atlan.pkg.cache.PersistentConnectionCache
+import org.testng.Assert.assertFalse
+import org.testng.Assert.assertTrue
+import java.nio.file.Paths
+import kotlin.test.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
+
+/**
+ * Test creation of assets followed by an upsert of the same assets, including calculating a delta.
+ */
+class CreateThenUpDeltaAIMTest : PackageTest("ctuda") {
+    override val logger = Utils.getLogger(this.javaClass.name)
+
+    private val conn1 = makeUnique("c1")
+    private val conn1Type = AtlanConnectorType.IBM_DB2
+    private lateinit var connection: Connection
+
+    private val testFile = "assets.csv"
+    private val revisedFile = "revised.csv"
+
+    private val files =
+        listOf(
+            testFile,
+            "debug.log",
+        )
+
+    private fun prepFile(connectionQN: String = connection.qualifiedName) {
+        // Prepare a copy of the file with unique names for connections
+        val input = Paths.get("src", "test", "resources", testFile).toFile()
+        val output = Paths.get(testDirectory, testFile).toFile()
+        input.useLines { lines ->
+            lines.forEach { line ->
+                val revised =
+                    line
+                        .replace("iceberg", "ibmdb2")
+                        .replace("{{CONNECTION}}", connectionQN)
+                output.appendText("$revised\n")
+            }
+        }
+    }
+
+    private fun modifyFile(connectionQN: String = connection.qualifiedName) {
+        // Modify the loaded file to make some changes (testing upsert)
+        val input = Paths.get(testDirectory, testFile).toFile()
+        val output = Paths.get(testDirectory, revisedFile).toFile()
+        input.useLines { lines ->
+            lines.forEach { line ->
+                if (!line.contains("VIEW")) {
+                    val revised =
+                        line
+                            .replace("A schema", "Revised schema description")
+                    output.appendText("$revised\n")
+                }
+            }
+        }
+        // Create some net-new assets
+        output.appendText("$connectionQN/DB/SCH/VIEW2,View,VIEW2,ibmdb2,$connectionQN,$connectionQN/DB,DB,$connectionQN/DB/SCH,SCH,,,Schema@$connectionQN/DB/SCH\n")
+        output.appendText("$connectionQN/DB/SCH/MVIEW1,MaterialisedView,MVIEW1,ibmdb2,$connectionQN,$connectionQN/DB,DB,$connectionQN/DB/SCH,SCH,,,Schema@$connectionQN/DB/SCH\n")
+    }
+
+    private val connectionAttrs: List<AtlanField> =
+        listOf(
+            Connection.NAME,
+            Connection.CONNECTOR_TYPE,
+            Connection.ADMIN_ROLES,
+            Connection.ADMIN_GROUPS,
+            Connection.ADMIN_USERS,
+        )
+
+    private val databaseAttrs: List<AtlanField> =
+        listOf(
+            Database.NAME,
+            Database.CONNECTION_QUALIFIED_NAME,
+            Database.CONNECTOR_TYPE,
+            Database.DESCRIPTION,
+            Database.SCHEMAS,
+        )
+
+    private val schemaAttrs: List<AtlanField> =
+        listOf(
+            Schema.NAME,
+            Schema.CONNECTION_QUALIFIED_NAME,
+            Schema.CONNECTOR_TYPE,
+            Schema.DESCRIPTION,
+            Schema.DATABASE_NAME,
+            Schema.DATABASE_QUALIFIED_NAME,
+            Schema.TABLES,
+            Schema.VIEWS,
+            Schema.MATERIALIZED_VIEWS,
+        )
+
+    private val tableAttrs: List<AtlanField> =
+        listOf(
+            Table.NAME,
+            Table.STATUS,
+            Table.CONNECTION_QUALIFIED_NAME,
+            Table.CONNECTOR_TYPE,
+            Table.DATABASE_NAME,
+            Table.DATABASE_QUALIFIED_NAME,
+            Table.SCHEMA_NAME,
+            Table.SCHEMA_QUALIFIED_NAME,
+            Table.SCHEMA,
+        )
+
+    private fun createConnection(): Connection {
+        val c1 = Connection.creator(client, conn1, conn1Type).build()
+        val response = c1.save(client).block()
+        return response.getResult(c1)
+    }
+
+    override fun setup() {
+        connection = createConnection()
+        prepFile()
+        runCustomPackage(
+            AssetImportCfg(
+                assetsFile = Paths.get(testDirectory, testFile).toString(),
+                assetsUpsertSemantic = "upsert",
+                assetsFailOnErrors = true,
+                assetsDeltaSemantic = "full",
+            ),
+            Importer::main,
+        )
+    }
+
+    override fun teardown() {
+        removeConnection(conn1, conn1Type)
+    }
+
+    @Test(groups = ["aim.ctud.create"])
+    fun connection1Created() {
+        validateConnection()
+    }
+
+    private fun validateConnection() {
+        val found = Connection.findByName(client, conn1, conn1Type, connectionAttrs)
+        assertNotNull(found)
+        assertEquals(1, found.size)
+        val c1 = found[0]
+        assertEquals(conn1, c1.name)
+        assertEquals(conn1Type, c1.connectorType)
+    }
+
+    @Test(groups = ["aim.ctud.create"])
+    fun database1Created() {
+        validateDatabase()
+    }
+
+    private fun validateDatabase() {
+        val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!!
+        val request =
+            Database
+                .select(client)
+                .where(Database.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName))
+                .includesOnResults(databaseAttrs)
+                .includeOnRelations(Schema.NAME)
+                .toRequest()
+        val response = retrySearchUntil(request, 1)
+        val found = response.assets
+        assertEquals(1, found.size)
+        val db = found[0] as Database
+        assertEquals("DB", db.name)
+        assertEquals(c1.qualifiedName, db.connectionQualifiedName)
+        assertEquals(conn1Type, db.connectorType)
+        assertEquals(1, db.schemas.size)
+        assertEquals("SCH", db.schemas.first().name)
+    }
+
+    @Test(groups = ["aim.ctud.create"])
+    fun schema1Created() {
+        validateSchema("A schema")
+    }
+
+    private fun validateSchema(description: String) {
+        val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!!
+        val request =
+            Schema
+                .select(client)
+                .where(Schema.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName))
+                .includesOnResults(schemaAttrs)
+                .includeOnRelations(Asset.NAME)
+                .toRequest()
+        val response = retrySearchUntil(request, 1)
+        val found = response.assets
+        assertEquals(1, found.size)
+        val sch = found[0] as Schema
+        assertEquals("SCH", sch.name)
+        assertEquals(description, sch.description)
+        assertEquals(c1.qualifiedName, sch.connectionQualifiedName)
+        assertEquals(conn1Type, sch.connectorType)
+        assertEquals("DB", sch.databaseName)
+        assertTrue(sch.databaseQualifiedName.endsWith("/DB"))
+        assertEquals(1, sch.tables.size)
+        assertEquals("TBL", sch.tables.first().name)
+        if (description == "Revised schema description") {
+            assertEquals(1, sch.views.size)
+            assertEquals("VIEW2", sch.views.first().name)
+            assertEquals(1, sch.materializedViews.size)
+            assertEquals("MVIEW1", sch.materializedViews.first().name)
+        } else {
+            assertEquals(1, sch.views.size)
+            assertEquals("VIEW", sch.views.first().name)
+        }
+    }
+
+    @Test(groups = ["aim.ctud.create"])
+    fun table1Created() {
+        validateTable()
+    }
+
+    private fun validateTable() {
+        val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!!
+        val request =
+            Table
+                .select(client)
+                .where(Table.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName))
+                .includesOnResults(tableAttrs)
+                .includeOnRelations(Schema.NAME)
+                .toRequest()
+        val response = retrySearchUntil(request, 1)
+        val found = response.assets
+        assertEquals(1, found.size)
+        val tbl = found[0] as Table
+        assertEquals("TBL", tbl.name)
+        assertEquals(c1.qualifiedName, tbl.connectionQualifiedName)
+        assertEquals(conn1Type, tbl.connectorType)
+        assertEquals("DB", tbl.databaseName)
+        assertTrue(tbl.databaseQualifiedName.endsWith("/DB"))
+        assertEquals("SCH", tbl.schemaName)
+        assertTrue(tbl.schemaQualifiedName.endsWith("/DB/SCH"))
+        assertEquals("SCH", tbl.schema.name)
+    }
+
+    @Test(groups = ["aim.ctud.create"])
+    fun view1Created() {
+        validateView()
+    }
+
+    private fun validateView(exists: Boolean = true) {
+        val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!!
+        if (!exists) {
+            val request =
+                View
+                    .select(client, true)
+                    .where(View.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName))
+                    .where(View.STATUS.eq(AtlanStatus.DELETED))
+                    .includesOnResults(tableAttrs)
+                    .toRequest()
+            val response = retrySearchUntil(request, 1, true)
+            val found = response.assets
+            assertEquals(1, found.size)
+            val view = found[0] as View
+            if (view.status != AtlanStatus.DELETED) {
+                logger.error { "Exact request: ${request.toJson(client)}" }
+                logger.error { "Exact response: ${response.rawJsonObject}" }
+            }
+            assertEquals(AtlanStatus.DELETED, view.status)
+        } else {
+            val request =
+                View
+                    .select(client)
+                    .where(View.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName))
+                    .includesOnResults(tableAttrs)
+                    .includeOnRelations(Schema.NAME)
+                    .toRequest()
+            val response = retrySearchUntil(request, 1)
+            val found = response.assets
+            assertEquals(1, found.size)
+            val view = found[0] as View
+            assertEquals("VIEW", view.name)
+            assertEquals(c1.qualifiedName, view.connectionQualifiedName)
+            assertEquals(conn1Type, view.connectorType)
+            assertEquals("SCH", view.schema.name)
+        }
+    }
+
+    @Test(groups = ["aim.ctud.create"])
+    fun connectionCacheCreated() {
+        validateConnectionCache()
+    }
+
+    private fun validateConnectionCache(created: Boolean = true) {
+        val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!!
+        val dbFile = Paths.get(testDirectory, "connection-cache", "${c1.qualifiedName}.sqlite").toFile()
+        assertTrue(dbFile.isFile)
+        assertTrue(dbFile.exists())
+        val cache = PersistentConnectionCache(dbFile.path)
+        val assets = cache.listAssets()
+        assertNotNull(assets)
+        assertFalse(assets.isEmpty())
+        if (created) {
+            assertEquals(4, assets.size)
+            assertEquals(setOf(Database.TYPE_NAME, Schema.TYPE_NAME, Table.TYPE_NAME, View.TYPE_NAME), assets.map { it.typeName }.toSet())
+            assertEquals(1, assets.count { it.typeName == Table.TYPE_NAME })
+            assertEquals(1, assets.count { it.typeName == View.TYPE_NAME })
+            assertEquals(setOf("VIEW"), assets.filter { it.typeName == View.TYPE_NAME }.map { it.name }.toSet())
+        } else {
+            assertEquals(5, assets.size)
+            assertEquals(setOf(Database.TYPE_NAME, Schema.TYPE_NAME, Table.TYPE_NAME, View.TYPE_NAME, MaterializedView.TYPE_NAME), assets.map { it.typeName }.toSet())
+            assertEquals(1, assets.count { it.typeName == Table.TYPE_NAME })
+            assertEquals(1, assets.count { it.typeName == View.TYPE_NAME })
+            assertEquals(1, assets.count { it.typeName == MaterializedView.TYPE_NAME })
+            assertEquals(setOf("VIEW2"), assets.filter { it.typeName == View.TYPE_NAME }.map { it.name }.toSet())
+        }
+    }
+
+    @Test(groups = ["aim.ctud.runUpdate"], dependsOnGroups = ["aim.ctud.create"])
+    fun upsertRevisions() {
+        modifyFile()
+        runCustomPackage(
+            AssetImportCfg(
+                assetsFile = Paths.get(testDirectory, revisedFile).toString(),
+                assetsUpsertSemantic = "upsert",
+                assetsFailOnErrors = true,
+                assetsDeltaSemantic = "full",
+                assetsDeltaReloadCalculation = "changes",
+                assetsPreviousFileDirect = Paths.get(testDirectory, testFile).toString(),
+            ),
+            Importer::main,
+        )
+        // Allow Elastic index and deletion to become consistent
+        Thread.sleep(15000)
+        val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!!
+        val request =
+            MaterializedView
+                .select(client)
+                .where(MaterializedView.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName))
+                .where(MaterializedView.NAME.eq("MVIEW1"))
+                .toRequest()
+        retrySearchUntil(request, 1)
+    }
+
+    @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"])
+    fun connectionUnchanged() {
+        validateConnection()
+    }
+
+    @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"])
+    fun databaseUnchanged() {
+        validateDatabase()
+    }
+
+    @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"])
+    fun schemaChanged() {
+        validateSchema("Revised schema description")
+    }
+
+    @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"])
+    fun tableUnchanged() {
+        validateTable()
+    }
+
+    @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"])
+    fun viewRemoved() {
+        validateView(false)
+    }
+
+    @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"])
+    fun entirelyNewView() {
+        val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!!
+        val request =
+            View
+                .select(client)
+                .where(View.CONNECTION_QUALIFIED_NAME.eq(c1.qualifiedName))
+                .includesOnResults(tableAttrs)
+                .includeOnRelations(Schema.NAME)
+                .toRequest()
+        val response = retrySearchUntil(request, 1)
+        val found = response.assets
+        assertEquals(1, found.size)
+        val view = found[0] as View
+        assertEquals("VIEW2", view.name)
+        assertEquals(c1.qualifiedName, view.connectionQualifiedName)
+        assertEquals(conn1Type, view.connectorType)
+        assertEquals("SCH", view.schema.name)
+    }
+
+    @Test(groups = ["aim.ctud.update"], dependsOnGroups = ["aim.ctud.runUpdate"])
+    fun connectionCacheUpdated() {
+        validateConnectionCache(false)
+    }
+
+    @Test(dependsOnGroups = ["aim.ctud.*"])
+    fun filesCreated() {
+        validateFilesExist(files)
+    }
+
+    @Test(dependsOnGroups = ["aim.ctud.*"])
+    fun previousRunFilesCreated() {
+        val c1 = Connection.findByName(client, conn1, conn1Type, connectionAttrs)[0]!!
+        val directory = Paths.get(testDirectory, Importer.PREVIOUS_FILES_PREFIX, c1.qualifiedName).toFile()
+        assertNotNull(directory)
+        assertTrue(directory.isDirectory)
+        val files = directory.walkTopDown().filter { it.isFile }.toList()
+        assertEquals(2, files.size)
+    }
+
+    @Test(dependsOnGroups = ["aim.ctud.*"])
+    fun errorFreeLog() {
+        validateErrorFreeLog()
+    }
+}
diff --git a/samples/packages/asset-import/src/test/resources/assets.csv b/samples/packages/asset-import/src/test/resources/assets.csv
index 72a404f75f..9af810e2d0 100644
--- a/samples/packages/asset-import/src/test/resources/assets.csv
+++ b/samples/packages/asset-import/src/test/resources/assets.csv
@@ -1,5 +1,5 @@
-qualifiedName,typeName,name,connectorType,connectionQualifiedName,databaseQualifiedName,databaseName,schemaQualifiedName,schemaName,description
-{{CONNECTION}}/DB,Database,DB,iceberg,{{CONNECTION}},,,,,
-{{CONNECTION}}/DB/SCH,Schema,SCH,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,,,A schema
-{{CONNECTION}}/DB/SCH/TBL,Table,TBL,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,
-{{CONNECTION}}/DB/SCH/VIEW,View,VIEW,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,
+qualifiedName,typeName,name,connectorType,connectionQualifiedName,databaseQualifiedName,databaseName,schemaQualifiedName,schemaName,description,database,schema
+{{CONNECTION}}/DB,Database,DB,iceberg,{{CONNECTION}},,,,,,,
+{{CONNECTION}}/DB/SCH,Schema,SCH,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,,,A schema,Database@{{CONNECTION}}/DB,
+{{CONNECTION}}/DB/SCH/TBL,Table,TBL,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,,,Schema@{{CONNECTION}}/DB/SCH
+{{CONNECTION}}/DB/SCH/VIEW,View,VIEW,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,,,Schema@{{CONNECTION}}/DB/SCH
diff --git a/samples/packages/asset-import/src/test/resources/assets_latest.csv b/samples/packages/asset-import/src/test/resources/assets_latest.csv
index c6b99574bb..85638f5cfa 100644
--- a/samples/packages/asset-import/src/test/resources/assets_latest.csv
+++ b/samples/packages/asset-import/src/test/resources/assets_latest.csv
@@ -1,5 +1,5 @@
-qualifiedName,typeName,name,connectorType,connectionQualifiedName,databaseQualifiedName,databaseName,schemaQualifiedName,schemaName,description
-{{CONNECTION}}/DB,Database,DB,iceberg,{{CONNECTION}},,,,,
-{{CONNECTION}}/DB/SCH,Schema,SCH,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,,,A revised schema
-{{CONNECTION}}/DB/SCH/TBL,Table,TBL,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,
-{{CONNECTION}}/DB/SCH/MVIEW,MaterialisedView,MVIEW,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,
+qualifiedName,typeName,name,connectorType,connectionQualifiedName,databaseQualifiedName,databaseName,schemaQualifiedName,schemaName,description,database,schema
+{{CONNECTION}}/DB,Database,DB,iceberg,{{CONNECTION}},,,,,,,
+{{CONNECTION}}/DB/SCH,Schema,SCH,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,,,A revised schema,Database@{{CONNECTION}}/DB,
+{{CONNECTION}}/DB/SCH/TBL,Table,TBL,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,,,Schema@{{CONNECTION}}/DB/SCH
+{{CONNECTION}}/DB/SCH/MVIEW,MaterialisedView,MVIEW,iceberg,{{CONNECTION}},{{CONNECTION}}/DB,DB,{{CONNECTION}}/DB/SCH,SCH,,,Schema@{{CONNECTION}}/DB/SCH

From f3f55f11936d975ce826603f7717abc19682c898 Mon Sep 17 00:00:00 2001
From: Christopher Grote <cmgrote@users.noreply.github.com>
Date: Tue, 28 Jan 2025 09:38:08 +0000
Subject: [PATCH 6/6] Dependency bumps

Signed-off-by: Christopher Grote <cmgrote@users.noreply.github.com>
---
 buildSrc/build.gradle.kts | 2 +-
 gradle/libs.versions.toml | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts
index 433c351246..afb36f0443 100644
--- a/buildSrc/build.gradle.kts
+++ b/buildSrc/build.gradle.kts
@@ -9,7 +9,7 @@ repositories {
 }
 
 dependencies {
-    implementation("org.jetbrains.kotlin.jvm:org.jetbrains.kotlin.jvm.gradle.plugin:2.1.0")
+    implementation("org.jetbrains.kotlin.jvm:org.jetbrains.kotlin.jvm.gradle.plugin:2.1.10")
     implementation("com.diffplug.spotless:spotless-plugin-gradle:7.0.2")
     implementation("io.freefair.gradle:lombok-plugin:8.12")
     implementation("net.ltgt.gradle:gradle-errorprone-plugin:4.0.1")
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index a1301ae6be..cbd01738aa 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -8,7 +8,7 @@ testng = "7.10.2"
 log4j = "2.24.3"
 wiremock = "3.10.0"
 jnanoid = "2.0.0"
-awssdk = "2.30.6"
+awssdk = "2.30.7"
 gcs = "26.51.0"  # was: "26.53.0"
 system-stubs = "2.1.7"
 fastcsv = "3.4.0"
@@ -27,7 +27,7 @@ adls = "12.22.0"
 azure = "1.15.0"
 guava = "33.4.0-jre"
 openlineage = "1.27.0"
-kotlin = "2.1.0"
+kotlin = "2.1.10"
 kotlin-mu = "3.0.5"
 rocksdb = "9.10.0"
 jetty = "12.0.16"