Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] add findRelationshipsV3 API (impl + unit tests) #504

Merged
merged 5 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ public static String toJsonString(@Nonnull RecordTemplate recordTemplate) {
}
}

/**
* Creates a {@link DataMap} object from a serialized JSON string.
* @param jsonString the JSON string serialized using {@link JacksonDataTemplateCodec}
* @return the created {@link DataMap}
*/
public static DataMap toDataMap(@Nonnull String jsonString) {
final DataMap dataMap;

try {
dataMap = DATA_TEMPLATE_CODEC.stringToMap(jsonString);
} catch (IOException e) {
throw new ModelConversionException("Failed to deserialize DataMap: " + jsonString);
}

return dataMap;
}

/**
* Creates a {@link RecordTemplate} object from a serialized JSON string.
*
Expand All @@ -88,14 +105,7 @@ public static String toJsonString(@Nonnull RecordTemplate recordTemplate) {
*/
@Nonnull
public static <T extends RecordTemplate> T toRecordTemplate(@Nonnull Class<T> type, @Nonnull String jsonString) {
DataMap dataMap;
try {
dataMap = DATA_TEMPLATE_CODEC.stringToMap(jsonString);
} catch (IOException e) {
throw new ModelConversionException("Failed to deserialize DataMap: " + jsonString);
}

return toRecordTemplate(type, dataMap);
return toRecordTemplate(type, toDataMap(jsonString));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.dao;

import com.linkedin.data.DataMap;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
Expand Down Expand Up @@ -36,6 +37,11 @@
*/
@Slf4j
public class EbeanLocalRelationshipQueryDAO {
public static final String RELATED_TO = "relatedTo";
public static final String SOURCE = "source";
public static final String METADATA = "metadata";
public static final String RELATIONSHIP_RETURN_TYPE = "relationship.return.type";
public static final String MG_INTERNAL_ASSET_RELATIONSHIP_TYPE = "AssetRelationship.proto";
private final EbeanServer _server;
private final MultiHopsTraversalSqlGenerator _sqlGenerator;

Expand Down Expand Up @@ -222,6 +228,34 @@ public <RELATIONSHIP extends RecordTemplate> List<RELATIONSHIP> findRelationship
@Nullable String destinationEntityType, @Nullable LocalRelationshipFilter destinationEntityFilter,
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull LocalRelationshipFilter relationshipFilter,
int offset, int count) {
List<SqlRow> sqlRows = findRelationshipsV2V3Core(
sourceEntityType, sourceEntityFilter, destinationEntityType, destinationEntityFilter,
relationshipType, relationshipFilter, offset, count);

return sqlRows.stream()
.map(row -> RecordUtils.toRecordTemplate(relationshipType, row.getString(METADATA)))
.collect(Collectors.toList());
}

/**
* Fetches a list of SqlRow of relationships of a specific type (Urn) based on the given filters if applicable.
*
* @param sourceEntityType type of source entity to query (e.g. "dataset")
* @param sourceEntityFilter the filter to apply to the source entity when querying (not applicable to non-MG entities)
* @param destinationEntityType type of destination entity to query (e.g. "dataset")
* @param destinationEntityFilter the filter to apply to the destination entity when querying (not applicable to non-MG entities)
* @param relationshipType the type of relationship to query
* @param relationshipFilter the filter to apply to relationship when querying
* @param offset the offset query should start at. Ignored if set to a negative value.
* @param count the maximum number of entities to return. Ignored if set to a non-positive value.
* @return A list of relationship records in SqlRow (col: source, destination, metadata, etc).
*/
@Nonnull
private <RELATIONSHIP extends RecordTemplate> List<SqlRow> findRelationshipsV2V3Core(
@Nullable String sourceEntityType, @Nullable LocalRelationshipFilter sourceEntityFilter,
@Nullable String destinationEntityType, @Nullable LocalRelationshipFilter destinationEntityFilter,
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull LocalRelationshipFilter relationshipFilter,
int offset, int count) {
validateEntityTypeAndFilter(sourceEntityFilter, sourceEntityType);
validateEntityTypeAndFilter(destinationEntityFilter, destinationEntityType);
validateRelationshipFilter(relationshipFilter);
Expand All @@ -237,11 +271,87 @@ public <RELATIONSHIP extends RecordTemplate> List<RELATIONSHIP> findRelationship
destTableName, destinationEntityFilter,
count, offset);

return _server.createSqlQuery(sql).findList().stream()
.map(row -> RecordUtils.toRecordTemplate(relationshipType, row.getString("metadata")))
return _server.createSqlQuery(sql).findList();
}

/**
* Finds a list of relationships of a specific type (Urn) based on the given filters if applicable.
* Similar to findRelationshipsV2, but this method wraps the relationship in a specific class provided by user.
* The intended use case is for MG internally with AssetRelationship, but since it is an open API, we are leaving room for extendability.
*
* @param sourceEntityType type of source entity to query (e.g. "dataset")
* @param sourceEntityFilter the filter to apply to the source entity when querying (not applicable to non-MG entities)
* @param destinationEntityType type of destination entity to query (e.g. "dataset")
* @param destinationEntityFilter the filter to apply to the destination entity when querying (not applicable to non-MG entities)
* @param relationshipType the type of relationship to query
* @param relationshipFilter the filter to apply to relationship when querying
* @param assetRelationshipClass the wrapper class for the relationship type
* @param wrapOptions options to wrap the relationship. Currently unused. Leaving it open for the future.
* @param offset the offset query should start at. Ignored if set to a negative value.
* @param count the maximum number of entities to return. Ignored if set to a non-positive value.
* @return A list of relationship records.
*/
@Nonnull
public <ASSET_RELATIONSHIP extends RecordTemplate, RELATIONSHIP extends RecordTemplate> List<ASSET_RELATIONSHIP> findRelationshipsV3(
@Nullable String sourceEntityType, @Nullable LocalRelationshipFilter sourceEntityFilter,
@Nullable String destinationEntityType, @Nullable LocalRelationshipFilter destinationEntityFilter,
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull LocalRelationshipFilter relationshipFilter,
@Nonnull Class<ASSET_RELATIONSHIP> assetRelationshipClass, @Nullable Map<String, Object> wrapOptions,
int offset, int count) {
if (wrapOptions == null || !wrapOptions.containsKey(RELATIONSHIP_RETURN_TYPE)
|| !MG_INTERNAL_ASSET_RELATIONSHIP_TYPE.equals(wrapOptions.get(RELATIONSHIP_RETURN_TYPE))) {
throw new IllegalArgumentException("Please check your use of the findRelationshipsV3 method.");
}

List<SqlRow> sqlRows = findRelationshipsV2V3Core(
sourceEntityType, sourceEntityFilter, destinationEntityType, destinationEntityFilter,
relationshipType, relationshipFilter, offset, count);

return sqlRows.stream()
.map(row -> createAssetRelationshipWrapperForRelationship(
relationshipType, assetRelationshipClass, row.getString(METADATA), row.getString(SOURCE), wrapOptions))
.collect(Collectors.toList());
}

/**
* Wraps the relationship in a specific class provided by user.
* The intended use case is for MG internally with AssetRelationship, but since it is an open API, we are leaving room for extendability.
*
* @param relationshipType the type of relationship to query
* @param assetRelationshipClass the wrapper class for the relationship type. By default, AssetRelationship.
* @param metadata the metadata string which can be parsed into a relationship
* @param sourceUrn the source urn
* @param wrapOptions options to wrap the relationship. Currently unused. Leaving it open for the future.
* @return A wrapped relationship record.
*/
@Nonnull
private <ASSET_RELATIONSHIP extends RecordTemplate, RELATIONSHIP extends RecordTemplate> ASSET_RELATIONSHIP createAssetRelationshipWrapperForRelationship(
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull Class<ASSET_RELATIONSHIP> assetRelationshipClass,
@Nonnull String metadata, @Nonnull String sourceUrn, @Nullable Map<String, Object> wrapOptions) {
// TODO: if other type of ASSET_RELATIONSHIP is needed, we need to distinguish it with wrapOptions and handles differently.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by this? What "other type of ASSET_RELATIONSHIP" can there be? Can you share an example of a wrapOption?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since AssetRelationship is an LI internal model, we need to pass it here, so why not make the API params more flexible.

say in the future, we have a AssetRelationshipV2, we can still reuse this API and change the code to handle differently.

the wrapOption can have something like { "relationship.version": "2.0" }, then we check and do parsing differently. Or just anything we may want to use in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my main concern is we cant keep making new versions of the same API, so adding a map for params can make it more flexible if we want to pass in something new later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that makes sense.


// parse metadata json string into DataMap
final DataMap relationshipDataMap = RecordUtils.toDataMap(metadata);

final DataMap relatedToDataMap = new DataMap();
// e.g. "BelongsToV2" -> "belongsToV2"
final String relationshipName = decapitalize(relationshipType.getSimpleName());
relatedToDataMap.put(relationshipName, relationshipDataMap);

final DataMap dataMap = new DataMap();
dataMap.put(RELATED_TO, relatedToDataMap);
dataMap.put(SOURCE, sourceUrn);

return RecordUtils.toRecordTemplate(assetRelationshipClass, dataMap);
}

private static String decapitalize(String str) {
if (str == null || str.isEmpty()) {
return str;
}
return str.substring(0, 1).toLowerCase() + str.substring(1);
}

/**
* Checks if a given entity type has an entity table in the db.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import com.linkedin.testing.EntityAspectUnion;
import com.linkedin.testing.EntityAspectUnionArray;
import com.linkedin.testing.FooSnapshot;
import com.linkedin.testing.localrelationship.AssetRelationship;
import com.linkedin.testing.localrelationship.BelongsTo;
import com.linkedin.testing.localrelationship.BelongsToV2;
import com.linkedin.testing.localrelationship.ConsumeFrom;
import com.linkedin.testing.localrelationship.EnvorinmentType;
import com.linkedin.testing.localrelationship.OwnedBy;
Expand All @@ -49,7 +51,9 @@
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.naming.OperationNotSupportedException;
Expand All @@ -59,6 +63,7 @@
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

import static com.linkedin.metadata.dao.EbeanLocalRelationshipQueryDAO.*;
import static com.linkedin.testing.TestUtils.*;
import static org.testng.Assert.*;

Expand Down Expand Up @@ -694,6 +699,106 @@ ReportsTo.class, new LocalRelationshipFilter().setCriteria(new LocalRelationship
assertEquals(actual, expected);
}

@Test(dataProvider = "schemaConfig")
public void testFindRelationshipsV3WithRelationshipV1(EbeanLocalDAO.SchemaConfig schemaConfig) throws URISyntaxException {
FooUrn alice = new FooUrn(1);
FooUrn bob = new FooUrn(2);

// Add Alice, Bob and Jack into entity tables.
if (schemaConfig == EbeanLocalDAO.SchemaConfig.NEW_SCHEMA_ONLY) {
_fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null, false);
_fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null, false);
}

// Add Bob reports-to ALice relationship
ReportsTo bobReportsToAlice = new ReportsTo().setSource(bob).setDestination(alice);
_localRelationshipWriterDAO.addRelationships(bob, AspectFoo.class, Collections.singletonList(bobReportsToAlice), false);

// Find all reports-to relationship for Alice.
LocalRelationshipFilter destFilter;
if (schemaConfig == EbeanLocalDAO.SchemaConfig.OLD_SCHEMA_ONLY) {
// old schema does not support non-urn field filters
LocalRelationshipCriterion oldSchemaFilterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create(alice.toString()),
Condition.EQUAL,
new UrnField());
destFilter = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray(oldSchemaFilterCriterion));
} else {
LocalRelationshipCriterion filterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create("Alice"),
Condition.EQUAL,
new AspectField().setAspect(AspectFoo.class.getCanonicalName()).setPath("/value"));
destFilter = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray(filterCriterion));
}

_localRelationshipQueryDAO.setSchemaConfig(schemaConfig);

Map<String, Object> wrapOptions = new HashMap<>();
wrapOptions.put(RELATIONSHIP_RETURN_TYPE, MG_INTERNAL_ASSET_RELATIONSHIP_TYPE);

List<AssetRelationship> reportsToAlice = _localRelationshipQueryDAO.findRelationshipsV3(
null, null, "foo", destFilter,
ReportsTo.class, new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray()).setDirection(RelationshipDirection.UNDIRECTED),
AssetRelationship.class, wrapOptions,
-1, -1);

AssetRelationship expected = reportsToAlice.get(0);
assertEquals(expected.getSource(), "urn:li:foo:2");

ReportsTo expectedReportsTo = expected.getRelatedTo().getReportsTo();

assertNotNull(expectedReportsTo);
assertEquals(expectedReportsTo.getSource().toString(), "urn:li:foo:2");
assertEquals(expectedReportsTo.getDestination().toString(), "urn:li:foo:1");
}

@Test(dataProvider = "schemaConfig")
public void testFindRelationshipsV3WithRelationshipV2(EbeanLocalDAO.SchemaConfig schemaConfig) throws URISyntaxException {
FooUrn owner = new FooUrn(1);
FooUrn car = new FooUrn(2);

// Add Alice, Bob and Jack into entity tables.
if (schemaConfig == EbeanLocalDAO.SchemaConfig.NEW_SCHEMA_ONLY) {
_fooUrnEBeanLocalAccess.add(car, new AspectFoo().setValue("Car"), AspectFoo.class, new AuditStamp(), null, false);
_fooUrnEBeanLocalAccess.add(owner, new AspectFoo().setValue("Owner"), AspectFoo.class, new AuditStamp(), null, false);
}

// Add Bob reports-to ALice relationship
BelongsToV2 carBelongsToOwner = new BelongsToV2();
carBelongsToOwner.setDestination(BelongsToV2.Destination.create(owner.toString()));
_localRelationshipWriterDAO.addRelationships(car, AspectFoo.class, Collections.singletonList(carBelongsToOwner), false);

// Find all reports-to relationship for Alice.
LocalRelationshipFilter destFilter;
if (schemaConfig == EbeanLocalDAO.SchemaConfig.OLD_SCHEMA_ONLY) {
// old schema does not support non-urn field filters
LocalRelationshipCriterion oldSchemaFilterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create(owner.toString()),
Condition.EQUAL,
new UrnField());
destFilter = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray(oldSchemaFilterCriterion));
} else {
LocalRelationshipCriterion filterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create("Owner"),
Condition.EQUAL,
new AspectField().setAspect(AspectFoo.class.getCanonicalName()).setPath("/value"));
destFilter = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray(filterCriterion));
}

_localRelationshipQueryDAO.setSchemaConfig(schemaConfig);

Map<String, Object> wrapOptions = new HashMap<>();
wrapOptions.put(RELATIONSHIP_RETURN_TYPE, MG_INTERNAL_ASSET_RELATIONSHIP_TYPE);

List<AssetRelationship> belongsToOwner = _localRelationshipQueryDAO.findRelationshipsV3(
null, null, "foo", destFilter,
BelongsToV2.class, new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray()).setDirection(RelationshipDirection.UNDIRECTED),
AssetRelationship.class, wrapOptions,
-1, -1);

AssetRelationship expected = belongsToOwner.get(0);
assertEquals(expected.getSource(), "urn:li:foo:2");

BelongsToV2 expectedBelongsToV2 = expected.getRelatedTo().getBelongsToV2();
assertEquals(expectedBelongsToV2.getDestination().getString(), owner.toString());
}

@Test
public void testIsMgEntityType() throws Exception {
// EbeanLocalRelationshipQueryDAOTest does not have the same package as EbeanLocalRelationshipQueryDAO (cant access protected method directly).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
DROP TABLE IF EXISTS metadata_relationship_belongsto;
DROP TABLE IF EXISTS metadata_relationship_belongstov2;
DROP TABLE IF EXISTS metadata_relationship_reportsto;
DROP TABLE IF EXISTS metadata_relationship_ownedby;
DROP TABLE IF EXISTS metadata_relationship_pairswith;
Expand All @@ -22,6 +23,20 @@ CREATE TABLE IF NOT EXISTS metadata_relationship_belongsto (
PRIMARY KEY (id)
);

CREATE TABLE IF NOT EXISTS metadata_relationship_belongstov2 (
id BIGINT NOT NULL AUTO_INCREMENT,
metadata LONGTEXT NOT NULL,
source VARCHAR(1000) NOT NULL,
source_type VARCHAR(100) NOT NULL,
destination VARCHAR(1000) NOT NULL,
destination_type VARCHAR(100) NOT NULL,
lastmodifiedon TIMESTAMP NOT NULL,
lastmodifiedby VARCHAR(255) NOT NULL,
deleted_ts DATETIME(6) DEFAULT NULL,
aspect VARCHAR(200) DEFAULT NULL, -- should be NOT NULL in production use cases
PRIMARY KEY (id)
);

CREATE TABLE IF NOT EXISTS metadata_relationship_reportsto (
id BIGINT NOT NULL AUTO_INCREMENT,
metadata JSON NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
DROP TABLE IF EXISTS metadata_relationship_belongsto;
DROP TABLE IF EXISTS metadata_relationship_belongstov2;
DROP TABLE IF EXISTS metadata_relationship_reportsto;
DROP TABLE IF EXISTS metadata_relationship_ownedby;
DROP TABLE IF EXISTS metadata_relationship_pairswith;
Expand All @@ -22,6 +23,20 @@ CREATE TABLE IF NOT EXISTS metadata_relationship_belongsto (
PRIMARY KEY (id)
);

CREATE TABLE IF NOT EXISTS metadata_relationship_belongstov2 (
id BIGINT NOT NULL AUTO_INCREMENT,
metadata LONGTEXT NOT NULL,
source VARCHAR(1000) NOT NULL,
source_type VARCHAR(100) NOT NULL,
destination VARCHAR(1000) NOT NULL,
destination_type VARCHAR(100) NOT NULL,
lastmodifiedon TIMESTAMP NOT NULL,
lastmodifiedby VARCHAR(255) NOT NULL,
deleted_ts DATETIME(6) DEFAULT NULL,
aspect VARCHAR(200) DEFAULT NULL, -- should be NOT NULL in production use cases
PRIMARY KEY (id)
);

CREATE TABLE IF NOT EXISTS metadata_relationship_reportsto (
id BIGINT NOT NULL AUTO_INCREMENT,
metadata JSON NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace com.linkedin.testing.localrelationship


/**
* A union of all supported relationship types.
*/
record AssetRelationship {

/**
* Source asset urn in the relationship
*/
source: optional string = ""

/**
* The union of specific relationship models, which contains the destination entity reference
* and other relationship specific attributes, such as lineageType in DownstreamOf relationship
*/
relatedTo: optional union[
reportsTo: ReportsTo,
belongsToV2: BelongsToV2
]
}