Skip to content

Commit

Permalink
Allow passthrough of previous files prefix (for backwards compatibility)
Browse files Browse the repository at this point in the history
Signed-off-by: Christopher Grote <[email protected]>
  • Loading branch information
cmgrote committed Jan 28, 2025
1 parent 195af31 commit fe5c8b1
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ data class AssetImportCfg(
@JsonProperty("assets_delta_removal_type") val assetsDeltaRemovalType: String = "archive",
@JsonProperty("assets_delta_reload_calculation") val assetsDeltaReloadCalculation: String = "all",
@JsonProperty("assets_previous_file_direct") val assetsPreviousFileDirect: String = "",
@JsonProperty("assets_previous_file_prefix") val assetsPreviousFilePrefix: String = "",
@JsonProperty("assets_config") val assetsConfig: String? = null,
@JsonDeserialize(using = WidgetSerde.MultiSelectDeserializer::class)
@JsonSerialize(using = WidgetSerde.MultiSelectSerializer::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object Importer {
semantic = ctx.config.assetsDeltaSemantic,
qualifiedNamePrefix = preprocessedDetails.assetRootName,
removalType = ctx.config.assetsDeltaRemovalType,
previousFilesPrefix = PREVIOUS_FILES_PREFIX,
previousFilesPrefix = ctx.config.assetsPreviousFilePrefix.ifBlank { PREVIOUS_FILES_PREFIX },
resolver = AssetImporter,
preprocessedDetails = preprocessedDetails,
typesToRemove = emptyList(),
Expand Down
7 changes: 7 additions & 0 deletions samples/packages/asset-import/src/main/resources/package.pkl
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ uiConfig {
helpText = "Path to a direct file (locally) to use for delta processing."
fallback = ""
}
["assets_previous_file_prefix"] = new TextInput {
title = "Previous files location"
required = false
hide = true
helpText = "Object store prefix in which previous files exist for delta processing."
fallback = ""
}
["assets_config"] = new Radio {
title = "Options"
required = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ import java.util.stream.Stream
* asset in Atlan, then add that column's field to getAttributesToOverwrite.
*
* @param ctx context through which this package is running
* @param preprocessed details of the preprocessed CSV file
* @param inputFile the input file containing connection details
* @param logger through which to record logging
*/
class ConnectionImporter(
ctx: PackageContext<RelationalAssetsBuilderCfg>,
private val preprocessed: Importer.Results,
private val inputFile: String,
logger: KLogger,
) : AssetImporter(
ctx = ctx,
delta = null,
filename = preprocessed.preprocessedFile,
filename = inputFile,
// Only allow full or updates to connections, as partial connections would be hidden
// and impossible to delete via utilities like the Connection Delete workflow
typeNameFilter = Connection.TYPE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,64 @@ object Importer {
ctx.config.assetsPrefix,
assetsKey,
)
val targetHeaders = CSVXformer.getHeader(assetsInput, fieldSeparator).toMutableList()

ctx.connectionCache.preload()

FieldSerde.FAIL_ON_ERRORS.set(ctx.config.assetsFailOnErrors)
logger.info { "=== Importing assets... ===" }

logger.info { " --- Importing connections... ---" }
// Note: we force-track the batches here to ensure any created connections are cached
// (without tracking, any connections created will NOT be cached, either, which will then cause issues
// with the subsequent processing steps.)
// We also need to load these connections first, irrespective of any delta calculation, so that
// we can be certain we will be able to resolve the assets' qualifiedNames (for subsequent processing)
val connectionImporter = ConnectionImporter(ctx, assetsInput, logger)
connectionImporter.import()?.close()

val transformedFile = transform(ctx, fieldSeparator, assetsInput)

val previousFileXformed =
if (ctx.config.previousFileDirect.isNotBlank()) {
transform(ctx, fieldSeparator, ctx.config.previousFileDirect)
} else {
""
}

val importConfig =
AssetImportCfg(
assetsFile = transformedFile,
assetsUpsertSemantic = ctx.config.assetsUpsertSemantic,
assetsDeltaSemantic = ctx.config.deltaSemantic,
assetsDeltaRemovalType = ctx.config.deltaRemovalType,
assetsDeltaReloadCalculation = ctx.config.deltaReloadCalculation,
assetsPreviousFileDirect = previousFileXformed,
assetsPreviousFilePrefix = PREVIOUS_FILES_PREFIX,
assetsAttrToOverwrite = ctx.config.assetsAttrToOverwrite,
assetsFailOnErrors = ctx.config.assetsFailOnErrors,
assetsFieldSeparator = ctx.config.assetsFieldSeparator,
assetsBatchSize = ctx.config.assetsBatchSize,
trackBatches = ctx.config.trackBatches,
)
Utils.initializeContext(importConfig, ctx).use { iCtx ->
com.atlan.pkg.aim.Importer
.import(iCtx, outputDirectory)
?.close()
}
}

private fun transform(
ctx: PackageContext<RelationalAssetsBuilderCfg>,
fieldSeparator: Char,
inputFile: String,
): String {
val targetHeaders = getHeader(inputFile, fieldSeparator).toMutableList()
// Inject two columns at the end that we need for column assets
targetHeaders.add(Column.ORDER.atlanFieldName)
targetHeaders.add(ColumnXformer.COLUMN_PARENT_QN)
val revisedFile = Paths.get("$assetsInput.CSA_RAB.csv")
val revisedFile = Paths.get("$inputFile.CSA_RAB.csv")
val preprocessedDetails =
Preprocessor(assetsInput, fieldSeparator)
Preprocessor(inputFile, fieldSeparator)
.preprocess<Results>(
outputFile = revisedFile.toString(),
outputHeaders = targetHeaders,
Expand All @@ -103,25 +154,11 @@ object Importer {
ctx.termCache.preload()
}

ctx.connectionCache.preload()

FieldSerde.FAIL_ON_ERRORS.set(ctx.config.assetsFailOnErrors)
logger.info { "=== Importing assets... ===" }

logger.info { " --- Importing connections... ---" }
// Note: we force-track the batches here to ensure any created connections are cached
// (without tracking, any connections created will NOT be cached, either, which will then cause issues
// with the subsequent processing steps.)
// We also need to load these connections first, irrespective of any delta calculation, so that
// we can be certain we will be able to resolve the assets' qualifiedNames (for subsequent processing)
val connectionImporter = ConnectionImporter(ctx, preprocessedDetails, logger)
connectionImporter.import()?.close()

val completeHeaders = BASE_OUTPUT_HEADERS.toMutableList()
val transformedFile = "$assetsInput.transformed.csv"
val transformedFile = "$inputFile.transformed.csv"
// Determine any non-standard RAB fields in the header and append them to the end of
// the list of standard header fields, so they're passed-through to asset import
val inputHeaders = getHeader(preprocessedDetails.preprocessedFile, fieldSeparator = ctx.config.assetsFieldSeparator[0]).toMutableList()
val inputHeaders = getHeader(preprocessedDetails.preprocessedFile, fieldSeparator = fieldSeparator).toMutableList()
inputHeaders.removeAll(BASE_OUTPUT_HEADERS)
inputHeaders.removeAll(
listOf(
Expand Down Expand Up @@ -171,26 +208,7 @@ object Importer {
val columnXformer = ColumnXformer(ctx, completeHeaders, preprocessedDetails, logger)
columnXformer.transform(writer)
}

val importConfig =
AssetImportCfg(
assetsFile = transformedFile,
assetsUpsertSemantic = ctx.config.assetsUpsertSemantic,
assetsDeltaSemantic = ctx.config.deltaSemantic,
assetsDeltaRemovalType = ctx.config.deltaRemovalType,
assetsDeltaReloadCalculation = ctx.config.deltaReloadCalculation,
assetsPreviousFileDirect = ctx.config.previousFileDirect,
assetsAttrToOverwrite = ctx.config.assetsAttrToOverwrite,
assetsFailOnErrors = ctx.config.assetsFailOnErrors,
assetsFieldSeparator = ctx.config.assetsFieldSeparator,
assetsBatchSize = ctx.config.assetsBatchSize,
trackBatches = ctx.config.trackBatches,
)
Utils.initializeContext(importConfig, ctx).use { iCtx ->
com.atlan.pkg.aim.Importer
.import(iCtx, outputDirectory)
?.close()
}
return transformedFile
}

private class Preprocessor(
Expand Down

0 comments on commit fe5c8b1

Please sign in to comment.