Skip to content

Commit

Permalink
Adds parallel asset batching and refactors CSV loaders for reuse
Browse files Browse the repository at this point in the history
Signed-off-by: Christopher Grote <[email protected]>
  • Loading branch information
cmgrote committed Nov 28, 2023
1 parent 4a82d75 commit 882a38d
Show file tree
Hide file tree
Showing 22 changed files with 577 additions and 914 deletions.
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ jnanoid = "2.0.0"
numaflow = "0.4.8"
awssdk = "2.20.68"
system-stubs = "2.1.5"
fastcsv = "2.2.2"

[libraries]
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
Expand All @@ -28,6 +29,7 @@ jnanoid = { module = "com.aventrix.jnanoid:jnanoid", version.ref = "jnanoid" }
numaflow-java = { module = "io.numaproj.numaflow:numaflow-java", version.ref = "numaflow" }
awssdk-s3 = { module = "software.amazon.awssdk:s3", version.ref = "awssdk" }
system-stubs = { module = "uk.org.webcompere:system-stubs-testng", version.ref = "system-stubs" }
fastcsv = { module = "de.siegmar:fastcsv", version.ref = "fastcsv" }

[bundles]
java-test = [ "jnanoid", "testng", "wiremock" ]
Expand Down
2 changes: 2 additions & 0 deletions package-toolkit/runtime/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ plugins {

dependencies {
api(libs.jackson.kotlin)
api(libs.fastcsv)
// You would not need the dependencies below in reality, they are to simulate a running tenant
testImplementation(libs.bundles.java.test)
testImplementation(project(":mocks"))
Expand All @@ -29,6 +30,7 @@ tasks {
include(dependency("org.apache.logging.log4j:log4j-slf4j2-impl:.*"))
include(dependency("org.jetbrains.kotlin:kotlin-reflect:.*"))
include(dependency("com.fasterxml.jackson.module:jackson-module-kotlin:.*"))
include(dependency("de.siegmar:fastcsv:.*"))
}
mergeServiceFiles()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.atlan.pkg.serde.cell.CellXformer
import com.atlan.serde.Serde
import mu.KLogger
import java.lang.reflect.Method
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.atomic.AtomicBoolean

/**
Expand Down Expand Up @@ -118,6 +119,7 @@ object FieldSerde {
fun getBuilderForType(typeName: String): Asset.AssetBuilder<*, *> {
val assetClass = Serde.getAssetClassForType(typeName)
val method = assetClass.getMethod("_internal")
return method.invoke(null) as Asset.AssetBuilder<*, *>
return (method.invoke(null) as Asset.AssetBuilder<*, *>)
.guid("-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE - 1))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.atlan.model.assets.Readme
import com.atlan.pkg.Utils
import com.atlan.pkg.cache.LinkCache
import com.atlan.serde.Serde
import com.atlan.util.AssetBatch
import com.atlan.util.ParallelBatch
import mu.KLogger
import java.util.concurrent.atomic.AtomicLong

Expand Down Expand Up @@ -97,7 +97,7 @@ object AssetRefXformer {
fun buildRelated(
from: Asset,
relatedAssets: Map<String, Collection<Asset>>,
batch: AssetBatch,
batch: ParallelBatch,
count: AtomicLong,
totalRelated: AtomicLong,
logger: KLogger,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* SPDX-License-Identifier: Apache-2.0
Copyright 2023 Atlan Pte. Ltd. */
package com.atlan.pkg.rab
package com.atlan.pkg.serde.csv

import com.atlan.model.assets.Asset
import com.atlan.pkg.serde.RowDeserialization
Expand All @@ -15,21 +15,23 @@ interface AssetGenerator {
*
* @param row the row of values from which to build the asset
* @param header list of field names in the same order as columns in the tabular data
* @param typeName index of the typeName column
* @param typeIdx numeric index within the columns of the typeName field
* @param qnIdx numeric index within the columns of the qualifiedName field
* @param skipColumns columns to skip, i.e. that need to be handled in a later pass
* @return the asset(s) built from the values on the row
*/
fun buildFromRow(row: List<String>, header: List<String>, typeName: Int, skipColumns: Set<String>): RowDeserialization?
fun buildFromRow(row: List<String>, header: List<String>, typeIdx: Int, qnIdx: Int, skipColumns: Set<String>): RowDeserialization?

/**
* Check whether to include this row as part of the processing (true) or not (false).
*
* @param row of values
* @param header column names
* @param typeIdx index of the typeName
* @param qnIdx index of the qualifiedName
* @return true if the row should be included in the import, or false if not
*/
fun includeRow(row: List<String>, header: List<String>, typeIdx: Int): Boolean
fun includeRow(row: List<String>, header: List<String>, typeIdx: Int, qnIdx: Int): Boolean

/**
* Start a builder for the asset on this row.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/* SPDX-License-Identifier: Apache-2.0
Copyright 2023 Atlan Pte. Ltd. */
package com.atlan.pkg.serde.csv

import com.atlan.cache.ReflectionCache
import com.atlan.model.assets.Asset
import com.atlan.model.fields.AtlanField
import com.atlan.model.fields.SearchableField
import com.atlan.pkg.serde.RowDeserialization
import com.atlan.pkg.serde.RowDeserializer
import com.atlan.serde.Serde
import mu.KLogger
import java.lang.reflect.InvocationTargetException
import kotlin.system.exitProcess

/**
* Import assets into Atlan from a provided CSV file.
*
* Only the assets and attributes in the provided CSV file will attempt to be loaded.
* By default, any blank values in a cell in the CSV file will be ignored. If you would like any
* particular column's blank values to actually overwrite (i.e. remove) existing values for that
* asset in Atlan, then add that column's field to getAttributesToOverwrite.
*
* @param filename name of the file to import
* @param logger through which to record progress and any errors
* @param typeNameFilter name of the types that should be processed (primarily useful for multi-pass loads)
* @param attrsToOverwrite list of fields that should be overwritten in Atlan, if their value is empty in the CSV
* @param updateOnly if true, only update an asset (first check it exists), if false allow upserts (create if it does not exist)
* @param batchSize maximum number of records to save per API request
*/
abstract class CSVImporter(
private val filename: String,
protected val logger: KLogger,
protected val typeNameFilter: String = "",
private val attrsToOverwrite: List<AtlanField> = listOf(),
private val updateOnly: Boolean = false,
private val batchSize: Int = 20,
) : AssetGenerator {

/**
* Actually run the import.
*
* @param columnsToSkip (optional) columns in the CSV file to skip when loading (primarily useful for multi-pass loads)
*/
open fun import(columnsToSkip: Set<String> = setOf()) {
CSVReader(filename, updateOnly).use { csv ->
val start = System.currentTimeMillis()
val anyFailures = csv.streamRows(this, batchSize, logger, columnsToSkip)
logger.info { "Total time taken: ${System.currentTimeMillis() - start} ms" }
if (anyFailures) {
logger.error { "Some errors detected, failing the workflow." }
exitProcess(1)
}
cacheCreated(csv.created)
}
}

/** {@inheritDoc} */
override fun buildFromRow(row: List<String>, header: List<String>, typeIdx: Int, qnIdx: Int, skipColumns: Set<String>): RowDeserialization? {
// Deserialize the objects represented in that row (could be more than one due to flattening
// of in particular things like READMEs and Links)
if (includeRow(row, header, typeIdx, qnIdx)) {
val typeName = typeNameFilter.ifBlank {
row.getOrElse(typeIdx) { "" }
}
val qualifiedName = row.getOrElse(qnIdx) { "" }
val deserializer = RowDeserializer(
heading = header,
row = row,
typeIdx = typeIdx,
qnIdx = qnIdx,
typeName = typeName,
qualifiedName = qualifiedName,
logger = logger,
skipColumns = skipColumns,
)
val assets = deserializer.getAssets(getBuilder(deserializer))
if (assets != null) {
val builder = assets.primary
val candidate = builder.build()
val identity = RowDeserialization.AssetIdentity(candidate.typeName, candidate.qualifiedName)
// Then apply any field clearances based on attributes configured in the job
for (field in attrsToOverwrite) {
clearField(field, candidate, builder)
// If there are no related assets
if (!assets.related.containsKey(field.atlanFieldName)) {
assets.delete.add(field)
}
}
return RowDeserialization(identity, builder, assets.related, assets.delete)
}
}
return null
}

/** {@inheritDoc} */
override fun includeRow(row: List<String>, header: List<String>, typeIdx: Int, qnIdx: Int): Boolean {
return row[typeIdx] == typeNameFilter
}

/**
* Check if the provided field should be cleared, and if so clear it.
*
* @param field to check if it is empty and should be cleared
* @param candidate the asset on which to check whether the field is empty (or not)
* @param builder the builder against which to clear the field
* @return true if the field was cleared, false otherwise
*/
internal fun clearField(field: AtlanField, candidate: Asset, builder: Asset.AssetBuilder<*, *>): Boolean {
try {
val getter = ReflectionCache.getGetter(
Serde.getAssetClassForType(candidate.typeName),
field.atlanFieldName,
)
val value = getter.invoke(candidate)
if (value == null ||
(Collection::class.java.isAssignableFrom(value.javaClass) && (value as Collection<*>).isEmpty())
) {
builder.nullField(field.atlanFieldName)
return true
}
} catch (e: ClassNotFoundException) {
logger.error(
"Unknown type {} — cannot clear {}.",
candidate.typeName,
field.atlanFieldName,
e,
)
} catch (e: IllegalAccessException) {
logger.error(
"Unable to clear {} on: {}::{}",
field.atlanFieldName,
candidate.typeName,
candidate.qualifiedName,
e,
)
} catch (e: InvocationTargetException) {
logger.error(
"Unable to clear {} on: {}::{}",
field.atlanFieldName,
candidate.typeName,
candidate.qualifiedName,
e,
)
}
return false
}

companion object {
/**
* Determine which (if any) attributes should be cleared (removed) if they are empty in the input file.
*
* @param attrNames the list of attribute names provided through the configuration
* @param fileInfo a descriptor to qualify for which file the attributes are being set
* @param logger through which to record information
* @return parsed list of attribute names to be cleared
*/
fun attributesToClear(attrNames: MutableList<String>, fileInfo: String, logger: KLogger): List<AtlanField> {
if (attrNames.contains(Asset.CERTIFICATE_STATUS.atlanFieldName)) {
attrNames.add(Asset.CERTIFICATE_STATUS_MESSAGE.atlanFieldName)
}
if (attrNames.contains(Asset.ANNOUNCEMENT_TYPE.atlanFieldName)) {
attrNames.add(Asset.ANNOUNCEMENT_TITLE.atlanFieldName)
attrNames.add(Asset.ANNOUNCEMENT_MESSAGE.atlanFieldName)
}
logger.info { "Adding attributes to be cleared, if blank (for $fileInfo): $attrNames" }
val attrFields = mutableListOf<AtlanField>()
for (name in attrNames) {
attrFields.add(SearchableField(name, name))
}
return attrFields
}
}
}
Loading

0 comments on commit 882a38d

Please sign in to comment.