Skip to content

Commit

Permalink
Merge branch 'main' into APP-4737
Browse files Browse the repository at this point in the history
  • Loading branch information
cmgrote authored Jan 29, 2025
2 parents e13159d + 2941d2f commit 044ec89
Show file tree
Hide file tree
Showing 38 changed files with 1,525 additions and 663 deletions.
6 changes: 3 additions & 3 deletions buildSrc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ repositories {
}

dependencies {
implementation("org.jetbrains.kotlin.jvm:org.jetbrains.kotlin.jvm.gradle.plugin:2.1.0")
implementation("org.jetbrains.kotlin.jvm:org.jetbrains.kotlin.jvm.gradle.plugin:2.1.10")
implementation("com.diffplug.spotless:spotless-plugin-gradle:7.0.2")
implementation("io.freefair.gradle:lombok-plugin:8.11")
implementation("io.freefair.gradle:lombok-plugin:8.12")
implementation("net.ltgt.gradle:gradle-errorprone-plugin:4.0.1")
implementation("com.adarshr:gradle-test-logger-plugin:4.0.0")
implementation("org.pkl-lang:org.pkl-lang.gradle.plugin:0.27.1")
implementation("org.pkl-lang:org.pkl-lang.gradle.plugin:0.27.2")
}
14 changes: 7 additions & 7 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[versions]
jackson = "2.18.2"
slf4j = "2.0.16"
elasticsearch = "8.17.0"
elasticsearch = "8.17.1"
freemarker = "2.3.34"
classgraph = "4.8.179"
testng = "7.10.2"
log4j = "2.24.3"
wiremock = "3.10.0"
jnanoid = "2.0.0"
awssdk = "2.30.2"
awssdk = "2.30.7"
gcs = "26.51.0" # was: "26.53.0"
system-stubs = "2.1.7"
fastcsv = "3.4.0"
Expand All @@ -22,14 +22,14 @@ commons-io = "2.18.0"
sqlite = "3.48.0.0"
jakarta-mail = "2.1.3"
angus-mail = "2.0.3"
pkl = "0.27.1"
pkl = "0.27.2"
adls = "12.22.0"
azure = "1.15.0"
guava = "33.4.0-jre"
openlineage = "1.27.0"
kotlin = "2.1.0"
kotlin = "2.1.10"
kotlin-mu = "3.0.5"
rocksdb = "9.8.4"
rocksdb = "9.10.0"
jetty = "12.0.16"
netty = "4.1.117.Final"
otel = "1.42.1" # was: "1.45.0"
Expand Down Expand Up @@ -89,6 +89,6 @@ otel = [ "otel-sdk", "otel-exporter", "otel-autoconfig", "otel-appender" ]
poi = [ "apache-poi", "apache-poi-ooxml" ]

[plugins]
shadow = { id = "com.gradleup.shadow", version = "9.0.0-beta4" }
shadow = { id = "com.gradleup.shadow", version = "9.0.0-beta5" }
git-publish = { id = "org.ajoberstar.git-publish", version = "4.2.2" }
pkl = { id = "org.pkl-lang", version = "0.27.1" }
pkl = { id = "org.pkl-lang", version = "0.27.2" }
8 changes: 7 additions & 1 deletion package-toolkit/config/src/main/resources/Renderers.pkl
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ const function getLoggingConfPy(): FileOutput = new FileOutput {
[formatter_jsonFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
class=pyatlan.utils.JsonFormatter
"""
}

Expand All @@ -322,15 +323,18 @@ const function getRequirementsPy(): FileOutput = new FileOutput {
opentelemetry-sdk==1.29.0
opentelemetry-instrumentation-logging==0.50b0
opentelemetry-exporter-otlp==1.29.0
"""
}

/// Render the baseline requirements-dev.txt for a Python custom package.
const function getRequirementsDevPy(): FileOutput = new FileOutput {
text = """
black
pytest
pytest-order
nanoid
"""
}

Expand All @@ -355,6 +359,7 @@ const function getDockerfilePy(pkgName: String): FileOutput = new FileOutput {
WORKDIR /app
ENTRYPOINT ["/usr/local/bin/dumb-init", "--"]
"""
}

Expand All @@ -373,7 +378,7 @@ const function getConfigClassPy(m: Config): FileOutput = new FileOutput {
validate_multiselect,
validate_connector_and_connection,
)
from typing import Any, Optional, Union, Dict
from typing import Any, Optional, Union, Dict, List
import logging.config
PARENT = Path(__file__).parent
Expand Down Expand Up @@ -472,6 +477,7 @@ const function getConfigClassPy(m: Config): FileOutput = new FileOutput {
if field_value := getattr(self, key):
ret_val[value["env"]] = field_value.json()
return ret_val
"""
}.join("\n")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicLong
*/
abstract class CSVXformer(
private val inputFile: String,
private val targetHeader: Iterable<String?>?,
val targetHeader: Iterable<String?>?,
private val logger: KLogger,
private val fieldSeparator: Char = ',',
) : Closeable,
Expand Down Expand Up @@ -138,6 +138,20 @@ abstract class CSVXformer(
}
}

/**
* Run the transformation and produce the output into the specified file.
* Note: when using this method, it is your responsibility to first output the header into the writer.
* (No header will ever be included via this method.)
*
* @param writer CSV writer into which the transformed CSV output will be written.
*/
fun transform(writer: CsvWriter) {
val start = System.currentTimeMillis()
logger.info { "Transforming $inputFile..." }
mapWithoutHeader(writer)
logger.info { "Total transformation time: ${System.currentTimeMillis() - start} ms" }
}

/**
* Actually run the transformation.
*
Expand All @@ -146,6 +160,15 @@ abstract class CSVXformer(
private fun map(writer: CsvWriter) {
// Start by outputting the header row in the target CSV file
writer.writeRecord(targetHeader)
mapWithoutHeader(writer)
}

/**
* Actually run the transformation, not including any header.
*
* @param writer into which to write each transformed row of data
*/
private fun mapWithoutHeader(writer: CsvWriter) {
// Calculate total number of rows that need to be transformed...
val filteredRowCount = AtomicLong(0)
counter.stream().skip(1).forEach { row ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class DeltaProcessor(
*/
fun calculate() {
if (semantic == "full") {
if (preprocessedDetails.multipleConnections) {
throw IllegalStateException(
"""
Assets in multiple connections detected in the input file.
Full delta processing currently only works for a single connection per input file, exiting.
""".trimIndent(),
)
}
if (qualifiedNamePrefix.isNullOrBlank()) {
logger.warn { "Unable to determine qualifiedName prefix, cannot calculate any delta." }
} else {
Expand Down Expand Up @@ -219,12 +227,14 @@ class DeltaProcessor(
* @param hasLinks whether there are any links in the input file
* @param hasTermAssignments whether there are any term assignments in the input file
* @param preprocessedFile full path to the preprocessed input file
* @param multipleConnections whether multiple connections were present in the input file (true) or only a single connection (false)
*/
open class Results(
val assetRootName: String,
hasLinks: Boolean,
hasTermAssignments: Boolean,
val preprocessedFile: String,
val multipleConnections: Boolean = false,
) : RowPreprocessor.Results(
hasLinks = hasLinks,
hasTermAssignments = hasTermAssignments,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ class FileBasedDelta(
val deletionType = if (purge) AtlanDeleteType.PURGE else AtlanDeleteType.SOFT
val guidList = guidsToDeleteToDetails.entrySet()
val totalToDelete = guidsToDeleteToDetails.size
logger.info { " --- Deleting ($deletionType) $totalToDelete assets across $removeTypes... ---" }
if (removeTypes.isNotEmpty()) {
logger.info { " --- Deleting ($deletionType) $totalToDelete assets (limited to types: $removeTypes)... ---" }
} else {
logger.info { " --- Deleting ($deletionType) $totalToDelete assets... ---" }
}
val currentCount = AtomicLong(0)
if (totalToDelete < DELETION_BATCH) {
if (totalToDelete > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ data class AssetImportCfg(
@JsonProperty("assets_prefix") val assetsPrefix: String = "",
@JsonProperty("assets_key") val assetsKey: String = "",
@JsonProperty("assets_upsert_semantic") val assetsUpsertSemantic: String = "update",
@JsonProperty("assets_delta_semantic") val assetsDeltaSemantic: String = "delta",
@JsonProperty("assets_delta_removal_type") val assetsDeltaRemovalType: String = "archive",
@JsonProperty("assets_delta_reload_calculation") val assetsDeltaReloadCalculation: String = "all",
@JsonProperty("assets_previous_file_direct") val assetsPreviousFileDirect: String = "",
@JsonProperty("assets_previous_file_prefix") val assetsPreviousFilePrefix: String = "",
@JsonProperty("assets_config") val assetsConfig: String? = null,
@JsonDeserialize(using = WidgetSerde.MultiSelectDeserializer::class)
@JsonSerialize(using = WidgetSerde.MultiSelectSerializer::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,13 @@ import com.atlan.pkg.serde.csv.CSVPreprocessor
import com.atlan.pkg.serde.csv.CSVXformer
import com.atlan.pkg.serde.csv.ImportResults
import com.atlan.pkg.serde.csv.RowPreprocessor
import com.atlan.pkg.util.AssetResolver
import com.atlan.pkg.util.AssetResolver.QualifiedNameDetails
import com.atlan.pkg.util.DeltaProcessor
import com.atlan.util.AssetBatch.AssetIdentity
import com.atlan.util.StringUtils
import mu.KLogger
import java.io.IOException

/**
* Import assets into Atlan from a provided CSV file.
Expand All @@ -213,11 +219,13 @@ import mu.KLogger
* asset in Atlan, then add that column's field to getAttributesToOverwrite.
*
* @param ctx context in which the package is running
* @param delta the processor containing any details about file deltas
* @param filename name of the file to import
* @param logger through which to write log entries
*/
class AssetImporter(
ctx: PackageContext<AssetImportCfg>,
private val delta: DeltaProcessor?,
filename: String,
logger: KLogger,
) : CSVImporter(
Expand Down Expand Up @@ -264,8 +272,7 @@ class AssetImporter(
.stream()
.filter { it.endDef1.type == it.endDef2.type }
.forEach { cyclicalRelationships.getOrPut(it.endDef1.type) { mutableSetOf() }.add(RelationshipEnds(it.name, it.endDef1.name, it.endDef2.name)) }
val results = super.preprocess(outputFile, outputHeaders)
return results
return super.preprocess(outputFile, outputHeaders)
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -391,12 +398,22 @@ class AssetImporter(
typeIdx: Int,
qnIdx: Int,
): Boolean {
return if (updateOnly) {
// If we are only updating, process in-parallel, in any order
row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }).isNotBlank()
val candidateRow =
if (updateOnly) {
// If we are only updating, process in-parallel, in any order
row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }).isNotBlank()
} else {
// If we are doing more than only updates, process the assets in top-down order
row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }) == typeToProcess
}
// Only proceed processing this candidate row if we're doing non-delta processing, or we have
// detected that it needs to be loaded via the delta processing
return if (candidateRow) {
delta?.resolveAsset(row, header)?.let { identity ->
delta.reloadAsset(identity)
} ?: true
} else {
// If we are doing more than only updates, process the assets in top-down order
return row.size >= typeIdx && CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" }) == typeToProcess
false
}
}

Expand All @@ -405,7 +422,8 @@ class AssetImporter(
val types: List<String>,
)

companion object {
companion object : AssetResolver {
const val NO_CONNECTION_QN = "NO_CONNECTION_FOUND"
private val ordering =
listOf(
TypeGrouping(
Expand Down Expand Up @@ -814,12 +832,42 @@ class AssetImporter(
types.sortedBy { t ->
ordering.flatMap { it.types }.indexOf(t).takeIf { it >= 0 } ?: Int.MAX_VALUE
}

/** {@inheritDoc} */
override fun resolveAsset(
values: List<String>,
header: List<String>,
connectionsMap: Map<AssetResolver.ConnectionIdentity, String>,
): AssetIdentity {
val typeIdx = header.indexOf(Asset.TYPE_NAME.atlanFieldName)
if (typeIdx < 0) {
throw IOException(
"Unable to find the column 'typeName'. This is a mandatory column in the input CSV.",
)
}
val qnIdx = header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName)
if (qnIdx < 0) {
throw IOException(
"Unable to find the column 'qualifiedName'. This is a mandatory column in the input CSV.",
)
}
val typeName = CSVXformer.trimWhitespace(values[typeIdx])
val qualifiedName = CSVXformer.trimWhitespace(values[qnIdx])
return AssetIdentity(typeName, qualifiedName)
}

/** {@inheritDoc} */
override fun getQualifiedNameDetails(
row: List<String>,
header: List<String>,
typeName: String,
): QualifiedNameDetails = throw IllegalStateException("This method should never be called. Please raise an issue if you discover this in any log file.")
}

/** Pre-process the assets import file. */
private fun preprocess(): Results = Preprocessor(filename, fieldSeparator, logger).preprocess<Results>()

private class Preprocessor(
class Preprocessor(
originalFile: String,
fieldSeparator: Char,
logger: KLogger,
Expand All @@ -829,6 +877,7 @@ class AssetImporter(
fieldSeparator = fieldSeparator,
) {
private val typesInFile = mutableSetOf<String>()
private var connectionQNs = mutableSetOf<String>()

/** {@inheritDoc} */
override fun preprocessRow(
Expand All @@ -842,30 +891,48 @@ class AssetImporter(
if (typeName.isNotBlank()) {
typesInFile.add(row[typeIdx])
}
val qualifiedName = CSVXformer.trimWhitespace(row.getOrNull(header.indexOf(Asset.QUALIFIED_NAME.atlanFieldName)) ?: "")
val connectionQNFromAsset = StringUtils.getConnectionQualifiedName(qualifiedName)
if (connectionQNFromAsset != null) {
connectionQNs.add(connectionQNFromAsset)
} else if (typeName == Connection.TYPE_NAME) {
// If the qualifiedName comes back as null and the asset itself is a connection, add it
connectionQNs.add(qualifiedName)
} else {
throw IllegalStateException("Found an asset without a valid qualifiedName (of type $typeName): $qualifiedName")
}
return row
}

/** {@inheritDoc} */
override fun finalize(
header: List<String>,
outputFile: String?,
): RowPreprocessor.Results {
): DeltaProcessor.Results {
val results = super.finalize(header, outputFile)
return Results(
connectionQN = if (connectionQNs.isNotEmpty()) connectionQNs.first() else NO_CONNECTION_QN,
multipleConnections = connectionQNs.size > 1,
hasLinks = results.hasLinks,
hasTermAssignments = results.hasTermAssignments,
outputFile = outputFile ?: filename,
typesInFile = typesInFile,
)
}
}

private class Results(
class Results(
connectionQN: String,
multipleConnections: Boolean,
hasLinks: Boolean,
hasTermAssignments: Boolean,
outputFile: String,
val typesInFile: Set<String>,
) : RowPreprocessor.Results(
) : DeltaProcessor.Results(
assetRootName = connectionQN,
hasLinks = hasLinks,
hasTermAssignments = hasTermAssignments,
outputFile = null,
multipleConnections = multipleConnections,
preprocessedFile = outputFile,
)
}
Loading

0 comments on commit 044ec89

Please sign in to comment.