Skip to content

Commit

Permalink
Merge pull request #1256 from atlanhq/APP-5038
Browse files Browse the repository at this point in the history
Initial work for delta processing in asset import
  • Loading branch information
cmgrote authored Jan 28, 2025
2 parents 8bd0a95 + f3f55f1 commit d79d3c7
Show file tree
Hide file tree
Showing 14 changed files with 837 additions and 30 deletions.
2 changes: 1 addition & 1 deletion buildSrc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repositories {
}

dependencies {
implementation("org.jetbrains.kotlin.jvm:org.jetbrains.kotlin.jvm.gradle.plugin:2.1.0")
implementation("org.jetbrains.kotlin.jvm:org.jetbrains.kotlin.jvm.gradle.plugin:2.1.10")
implementation("com.diffplug.spotless:spotless-plugin-gradle:7.0.2")
implementation("io.freefair.gradle:lombok-plugin:8.12")
implementation("net.ltgt.gradle:gradle-errorprone-plugin:4.0.1")
Expand Down
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ testng = "7.10.2"
log4j = "2.24.3"
wiremock = "3.10.0"
jnanoid = "2.0.0"
awssdk = "2.30.6"
awssdk = "2.30.7"
gcs = "26.51.0" # was: "26.53.0"
system-stubs = "2.1.7"
fastcsv = "3.4.0"
Expand All @@ -27,7 +27,7 @@ adls = "12.22.0"
azure = "1.15.0"
guava = "33.4.0-jre"
openlineage = "1.27.0"
kotlin = "2.1.0"
kotlin = "2.1.10"
kotlin-mu = "3.0.5"
rocksdb = "9.10.0"
jetty = "12.0.16"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class DeltaProcessor(
*/
fun calculate() {
if (semantic == "full") {
if (preprocessedDetails.multipleConnections) {
throw IllegalStateException(
"""
Assets in multiple connections detected in the input file.
Full delta processing currently only works for a single connection per input file, exiting.
""".trimIndent(),
)
}
if (qualifiedNamePrefix.isNullOrBlank()) {
logger.warn { "Unable to determine qualifiedName prefix, cannot calculate any delta." }
} else {
Expand Down Expand Up @@ -219,12 +227,14 @@ class DeltaProcessor(
* @param hasLinks whether there are any links in the input file
* @param hasTermAssignments whether there are any term assignments in the input file
* @param preprocessedFile full path to the preprocessed input file
* @param multipleConnections whether multiple connections were present in the input file (true) or only a single connection (false)
*/
open class Results(
val assetRootName: String,
hasLinks: Boolean,
hasTermAssignments: Boolean,
val preprocessedFile: String,
val multipleConnections: Boolean = false,
) : RowPreprocessor.Results(
hasLinks = hasLinks,
hasTermAssignments = hasTermAssignments,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ class FileBasedDelta(
val deletionType = if (purge) AtlanDeleteType.PURGE else AtlanDeleteType.SOFT
val guidList = guidsToDeleteToDetails.entrySet()
val totalToDelete = guidsToDeleteToDetails.size
logger.info { " --- Deleting ($deletionType) $totalToDelete assets across $removeTypes... ---" }
if (removeTypes.isNotEmpty()) {
logger.info { " --- Deleting ($deletionType) $totalToDelete assets (limited to types: $removeTypes)... ---" }
} else {
logger.info { " --- Deleting ($deletionType) $totalToDelete assets... ---" }
}
val currentCount = AtomicLong(0)
if (totalToDelete < DELETION_BATCH) {
if (totalToDelete > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ data class AssetImportCfg(
@JsonProperty("assets_prefix") val assetsPrefix: String = "",
@JsonProperty("assets_key") val assetsKey: String = "",
@JsonProperty("assets_upsert_semantic") val assetsUpsertSemantic: String = "update",
@JsonProperty("assets_delta_semantic") val assetsDeltaSemantic: String = "delta",
@JsonProperty("assets_delta_removal_type") val assetsDeltaRemovalType: String = "archive",
@JsonProperty("assets_delta_reload_calculation") val assetsDeltaReloadCalculation: String = "all",
@JsonProperty("assets_previous_file_direct") val assetsPreviousFileDirect: String = "",
@JsonProperty("assets_config") val assetsConfig: String? = null,
@JsonDeserialize(using = WidgetSerde.MultiSelectDeserializer::class)
@JsonSerialize(using = WidgetSerde.MultiSelectSerializer::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,13 @@ import com.atlan.pkg.serde.csv.CSVPreprocessor
import com.atlan.pkg.serde.csv.CSVXformer
import com.atlan.pkg.serde.csv.ImportResults
import com.atlan.pkg.serde.csv.RowPreprocessor
import com.atlan.pkg.util.AssetResolver
import com.atlan.pkg.util.AssetResolver.QualifiedNameDetails
import com.atlan.pkg.util.DeltaProcessor
import com.atlan.util.AssetBatch.AssetIdentity
import com.atlan.util.StringUtils
import mu.KLogger
import java.io.IOException

/**
* Import assets into Atlan from a provided CSV file.
Expand All @@ -213,11 +219,13 @@ import mu.KLogger
* asset in Atlan, then add that column's field to getAttributesToOverwrite.
*
* @param ctx context in which the package is running
* @param delta the processor containing any details about file deltas
* @param filename name of the file to import
* @param logger through which to write log entries
*/
class AssetImporter(
ctx: PackageContext<AssetImportCfg>,
private val delta: DeltaProcessor?,
filename: String,
logger: KLogger,
) : CSVImporter(
Expand Down Expand Up @@ -264,8 +272,7 @@ class AssetImporter(
.stream()
.filter { it.endDef1.type == it.endDef2.type }
.forEach { cyclicalRelationships.getOrPut(it.endDef1.type) { mutableSetOf() }.add(RelationshipEnds(it.name, it.endDef1.name, it.endDef2.name)) }
val results = super.preprocess(outputFile, outputHeaders)
return results
return super.preprocess(outputFile, outputHeaders)
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -391,12 +398,22 @@ class AssetImporter(
typeIdx: Int,
qnIdx: Int,
): Boolean {
return if (updateOnly) {
// If we are only updating, process in-parallel, in any order
row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }).isNotBlank()
val candidateRow =
if (updateOnly) {
// If we are only updating, process in-parallel, in any order
row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }).isNotBlank()
} else {
// If we are doing more than only updates, process the assets in top-down order
row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }) == typeToProcess
}
// Only proceed processing this candidate row if we're doing non-delta processing, or we have
// detected that it needs to be loaded via the delta processing
return if (candidateRow) {
delta?.resolveAsset(row, header)?.let { identity ->
delta.reloadAsset(identity)
} ?: true
} else {
// If we are doing more than only updates, process the assets in top-down order
return row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }) == typeToProcess
false
}
}

Expand All @@ -405,7 +422,8 @@ class AssetImporter(
val types: List<String>,
)

companion object {
companion object : AssetResolver {
const val NO_CONNECTION_QN = "NO_CONNECTION_FOUND"
private val ordering =
listOf(
TypeGrouping(
Expand Down Expand Up @@ -814,12 +832,42 @@ class AssetImporter(
types.sortedBy { t ->
ordering.flatMap { it.types }.indexOf(t).takeIf { it >= 0 } ?: Int.MAX_VALUE
}

/** {@inheritDoc} */
override fun resolveAsset(
values: List<String>,
header: List<String>,
connectionsMap: Map<AssetResolver.ConnectionIdentity, String>,
): AssetIdentity {
val typeIdx = header.indexOf(Asset.TYPE_NAME.atlanFieldName)
if (typeIdx < 0) {
throw IOException(
"Unable to find the column 'typeName'. This is a mandatory column in the input CSV.",
)
}
val qnIdx = header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName)
if (qnIdx < 0) {
throw IOException(
"Unable to find the column 'qualifiedName'. This is a mandatory column in the input CSV.",
)
}
val typeName = CSVXformer.trimWhitespace(values[typeIdx])
val qualifiedName = CSVXformer.trimWhitespace(values[qnIdx])
return AssetIdentity(typeName, qualifiedName)
}

/** {@inheritDoc} */
override fun getQualifiedNameDetails(
row: List<String>,
header: List<String>,
typeName: String,
): QualifiedNameDetails = throw IllegalStateException("This method should never be called. Please raise an issue if you discover this in any log file.")
}

/** Pre-process the assets import file. */
private fun preprocess(): Results = Preprocessor(filename, fieldSeparator, logger).preprocess<Results>()

private class Preprocessor(
class Preprocessor(
originalFile: String,
fieldSeparator: Char,
logger: KLogger,
Expand All @@ -829,6 +877,7 @@ class AssetImporter(
fieldSeparator = fieldSeparator,
) {
private val typesInFile = mutableSetOf<String>()
private var connectionQNs = mutableSetOf<String>()

/** {@inheritDoc} */
override fun preprocessRow(
Expand All @@ -842,30 +891,41 @@ class AssetImporter(
if (typeName.isNotBlank()) {
typesInFile.add(row[typeIdx])
}
val qualifiedName = CSVXformer.trimWhitespace(row.getOrNull(header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName)) ?: "")
val connectionQNFromAsset = StringUtils.getConnectionQualifiedName(qualifiedName)
connectionQNs.add(connectionQNFromAsset)
return row
}

/** {@inheritDoc} */
override fun finalize(
header: List<String>,
outputFile: String?,
): RowPreprocessor.Results {
): DeltaProcessor.Results {
val results = super.finalize(header, outputFile)
return Results(
connectionQN = if (connectionQNs.isNotEmpty()) connectionQNs.first() else NO_CONNECTION_QN,
multipleConnections = connectionQNs.size > 1,
hasLinks = results.hasLinks,
hasTermAssignments = results.hasTermAssignments,
outputFile = outputFile ?: filename,
typesInFile = typesInFile,
)
}
}

private class Results(
class Results(
connectionQN: String,
multipleConnections: Boolean,
hasLinks: Boolean,
hasTermAssignments: Boolean,
outputFile: String,
val typesInFile: Set<String>,
) : RowPreprocessor.Results(
) : DeltaProcessor.Results(
assetRootName = connectionQN,
hasLinks = hasLinks,
hasTermAssignments = hasTermAssignments,
outputFile = null,
multipleConnections = multipleConnections,
preprocessedFile = outputFile,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.atlan.pkg.PackageContext
import com.atlan.pkg.Utils
import com.atlan.pkg.serde.FieldSerde
import com.atlan.pkg.serde.csv.ImportResults
import com.atlan.pkg.util.DeltaProcessor
import kotlin.system.exitProcess

/**
Expand All @@ -16,6 +17,8 @@ import kotlin.system.exitProcess
object Importer {
private val logger = Utils.getLogger(this.javaClass.name)

const val PREVIOUS_FILES_PREFIX = "csa-asset-import"

@JvmStatic
fun main(args: Array<String>) {
val outputDirectory = if (args.isEmpty()) "tmp" else args[0]
Expand Down Expand Up @@ -77,13 +80,50 @@ object Importer {
ctx.config.assetsKey,
)
FieldSerde.FAIL_ON_ERRORS.set(ctx.config.assetsFailOnErrors)
logger.info { "=== Importing assets... ===" }
val assetImporter = AssetImporter(ctx, assetsInput, logger)
val includes = assetImporter.preprocess()
if (includes.hasLinks) {
val previousFileDirect = ctx.config.assetsPreviousFileDirect
val preprocessedDetails =
AssetImporter
.Preprocessor(assetsInput, ctx.config.assetsFieldSeparator[0], logger)
.preprocess<AssetImporter.Results>()
if (preprocessedDetails.hasLinks) {
ctx.linkCache.preload()
}
assetImporter.import()
DeltaProcessor(
ctx = ctx,
semantic = ctx.config.assetsDeltaSemantic,
qualifiedNamePrefix = preprocessedDetails.assetRootName,
removalType = ctx.config.assetsDeltaRemovalType,
previousFilesPrefix = PREVIOUS_FILES_PREFIX,
resolver = AssetImporter,
preprocessedDetails = preprocessedDetails,
typesToRemove = emptyList(),
logger = logger,
reloadSemantic = ctx.config.assetsDeltaReloadCalculation,
previousFilePreprocessor =
AssetImporter.Preprocessor(
previousFileDirect,
ctx.config.assetsFieldSeparator[0],
logger,
),
outputDirectory = outputDirectory,
).use { delta ->

delta.calculate()

logger.info { "=== Importing assets... ===" }
val assetImporter = AssetImporter(ctx, delta, assetsInput, logger)
assetImporter.preprocess() // Note: we still do this to detect any cyclical relationships
val importedAssets = assetImporter.import()

delta.processDeletions()

// Note: we won't close the original set of changes here, as we'll combine it later for a full set of changes
// (at which point, it will be closed)
ImportResults.getAllModifiedAssets(ctx.client, false, importedAssets).use { modifiedAssets ->
delta.updateConnectionCache(modifiedAssets)
}
importedAssets
}
} else {
null
}
Expand Down Expand Up @@ -116,14 +156,6 @@ object Importer {
} else {
null
}

ImportResults.getAllModifiedAssets(ctx.client, false, resultsAssets).use { allModified ->
Utils.updateConnectionCache(
client = ctx.client,
added = allModified,
fallback = outputDirectory,
)
}
return ImportResults.combineAll(ctx.client, true, resultsGTC, resultsDDP, resultsAssets)
}
}
Loading

0 comments on commit d79d3c7

Please sign in to comment.