Skip to content

Commit

Permalink
Merge pull request #1107 from atlanhq/FT-855
Browse files Browse the repository at this point in the history
Fixes cyclical relationship detection in asset import
  • Loading branch information
cmgrote authored Dec 10, 2024
2 parents 42d0e9f + c229fdc commit db67b4e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,17 @@ class AssetImporter(
) {
private var header = emptyList<String>()
private var typeToProcess = ""
private val cyclicalRelationships = mutableMapOf<String, Set<String>>()
private val mapToSecondPass = mutableMapOf<String, Set<String>>()
private val cyclicalRelationships = mutableMapOf<String, MutableSet<RelationshipEnds>>()
private val mapToSecondPass = mutableMapOf<String, MutableSet<String>>()
private val secondPassRemain =
setOf(
Asset.QUALIFIED_NAME.atlanFieldName,
Asset.NAME.atlanFieldName,
Folder.PARENT_QUALIFIED_NAME.atlanFieldName,
Folder.COLLECTION_QUALIFIED_NAME.atlanFieldName,
) // TODO: other required fields, across ALL (non-GTC, non-mesh) types
)

private data class RelationshipEnds(val name: String, val end1: String, val end2: String)

/** {@inheritDoc} */
override fun preprocess(
Expand All @@ -255,8 +257,8 @@ class AssetImporter(
// (meaning relationships where both ends are of the same type)
val typeDefs = ctx.client.typeDefs.list(AtlanTypeCategory.RELATIONSHIP)
typeDefs.relationshipDefs.stream()
.filter { it.endDef1.type == it.endDef2.type && it.endDef1.cardinality == it.endDef2.cardinality }
.forEach { cyclicalRelationships[it.endDef1.type] = setOf(it.endDef1.name, it.endDef2.name) }
.filter { it.endDef1.type == it.endDef2.type }
.forEach { cyclicalRelationships.getOrPut(it.endDef1.type) { mutableSetOf() }.add(RelationshipEnds(it.name, it.endDef1.name, it.endDef2.name)) }
val results = super.preprocess(outputFile, outputHeaders)
return results
}
Expand All @@ -270,26 +272,26 @@ class AssetImporter(
): List<String> {
// Check if the type on this row has any cyclical relationships as headers in the input file
val typeName = CSVXformer.trimWhitespace(row.getOrElse(typeIdx) { "" })
if (!mapToSecondPass.containsKey(typeName)) {
if (this.header.isEmpty()) this.header = header
val cyclical = cyclicalRelationships.getOrElse(typeName) { emptySet() }.toList()
if (cyclical.size == 2) {
val one = cyclical[0]
val two = cyclical[1]
if (header.contains(one) && header.contains(two)) {
// If both ends of the same relationship are in the input file, throw an error
// alerting the user that this can't work and they'll need to pick one end or the other
throw IllegalStateException(
"""
Both ends of the same relationship found in the input file for type $typeName: $one <> $two.
You should only use one end of this relationship or the other when importing.
""".trimIndent(),
)
}
if (this.header.isEmpty()) this.header = header
cyclicalRelationships.getOrElse(typeName) { emptySet() }.toList().forEach { relationship ->
val one = relationship.end1
val two = relationship.end2
if (header.contains(one) && header.contains(two)) {
// If both ends of the same relationship are in the input file, throw an error
// alerting the user that this can't work, and they'll need to pick one end or the other
throw IllegalStateException(
"""
Both ends of the same relationship found in the input file for type $typeName: $one <> $two.
You should only use one end of this relationship or the other when importing.
""".trimIndent(),
)
}
// Retain any of the cyclical relationships that remain so that we can second-pass process them
val secondPassColumns = cyclical.filter { header.contains(it) }.toSet()
mapToSecondPass[typeName] = secondPassColumns
if (header.contains(one)) {
mapToSecondPass.getOrPut(typeName) { mutableSetOf() }.add(one)
} else if (header.contains(two)) {
mapToSecondPass.getOrPut(typeName) { mutableSetOf() }.add(two)
}
}
return row
}
Expand Down
31 changes: 7 additions & 24 deletions sdk/src/main/java/com/atlan/net/LiveAtlanResponseGetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,7 @@ private static HttpClient buildDefaultHttpClient() {
*/
private static void raiseMalformedJsonError(String responseBody, int responseCode, Throwable e)
throws ApiException {
String details = e == null ? "none" : e.getMessage();
throw new ApiException(ErrorCode.JSON_ERROR, e, responseBody, "" + responseCode, details);
throw new ApiException(ErrorCode.ERROR_PASSTHROUGH, e, "" + responseCode, responseBody, "");
}

/**
Expand All @@ -274,16 +273,9 @@ private static void raiseMalformedJsonError(String responseBody, int responseCod
* @throws AtlanException a more specific exception, based on the details of that response
*/
private static void handleApiError(AtlanResponse response) throws AtlanException {
// Attempt to parse the error into an AtlanError object, but if that fails, fallback
// to just using the raw message body (wasn't JSON to begin with)
AtlanError error = null;

// Check for a 500 response first -- if found, we won't have a JSON body to parse,
// so preemptively exit with a generic ApiException pass-through.
int rc = response.code();
if (rc == 500) {
throw new ApiException(
ErrorCode.ERROR_PASSTHROUGH, null, "" + rc, response.body() == null ? "" : response.body());
}

try {
error = Serde.allInclusiveMapper.readValue(response.body(), AtlanError.class);
} catch (IOException e) {
Expand All @@ -307,8 +299,8 @@ private static void handleApiError(int code, String body) throws AtlanException

// Check for a 500 response first -- if found, we won't have a JSON body to parse,
// so preemptively exit with a generic ApiException pass-through.
if (code == 500) {
throw new ApiException(ErrorCode.ERROR_PASSTHROUGH, null, "" + code, body == null ? "" : body);
if (code >= 500) {
raiseMalformedJsonError(body, code, null);
}

AtlanError error = new AtlanError();
Expand All @@ -326,17 +318,8 @@ private static void handleApiError(int code, String body) throws AtlanException
private static void handleApiError(AtlanEventStreamResponse response) throws AtlanException {
AtlanError error = null;

// Check for a 500 response first -- if found, we won't have a JSON body to parse,
// so preemptively exit with a generic ApiException pass-through.
int rc = response.code();
if (rc == 500) {
throw new ApiException(
ErrorCode.ERROR_PASSTHROUGH,
null,
"" + rc,
response.body() == null ? "" : response.body().toString());
}

// Attempt to parse the error into an AtlanError object, but if that fails, fallback
// to just using the raw message body (wasn't JSON to begin with)
try {
error = Serde.allInclusiveMapper.readValue(response.body().get(0), AtlanError.class);
} catch (IOException e) {
Expand Down

0 comments on commit db67b4e

Please sign in to comment.