Skip to content

Commit

Permalink
add findRelationshipsV3 API
Browse files Browse the repository at this point in the history
  • Loading branch information
ybz1013 committed Mar 6, 2025
1 parent cf60ec0 commit edc45e1
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.dao;

import com.linkedin.data.DataMap;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.UnionTemplate;
Expand Down Expand Up @@ -36,6 +37,9 @@
*/
@Slf4j
public class EbeanLocalRelationshipQueryDAO {
public static final String RELATED_TO = "relatedTo";
public static final String SOURCE = "source";
public static final String METADATA = "metadata";
private final EbeanServer _server;
private final MultiHopsTraversalSqlGenerator _sqlGenerator;

Expand Down Expand Up @@ -222,6 +226,34 @@ public <RELATIONSHIP extends RecordTemplate> List<RELATIONSHIP> findRelationship
@Nullable String destinationEntityType, @Nullable LocalRelationshipFilter destinationEntityFilter,
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull LocalRelationshipFilter relationshipFilter,
int offset, int count) {
List<SqlRow> sqlRows = findRelationshipsV2V3Core(
sourceEntityType, sourceEntityFilter, destinationEntityType, destinationEntityFilter,
relationshipType, relationshipFilter, offset, count);

return sqlRows.stream()
.map(row -> RecordUtils.toRecordTemplate(relationshipType, row.getString(METADATA)))
.collect(Collectors.toList());
}

/**
* Fetches a list of SqlRow of relationships of a specific type (Urn) based on the given filters if applicable.
*
* @param sourceEntityType type of source entity to query (e.g. "dataset")
* @param sourceEntityFilter the filter to apply to the source entity when querying (not applicable to non-MG entities)
* @param destinationEntityType type of destination entity to query (e.g. "dataset")
* @param destinationEntityFilter the filter to apply to the destination entity when querying (not applicable to non-MG entities)
* @param relationshipType the type of relationship to query
* @param relationshipFilter the filter to apply to relationship when querying
* @param offset the offset query should start at. Ignored if set to a negative value.
* @param count the maximum number of entities to return. Ignored if set to a non-positive value.
* @return A list of relationship records in SqlRow (col: source, destination, metadata, etc).
*/
@Nonnull
public <RELATIONSHIP extends RecordTemplate> List<SqlRow> findRelationshipsV2V3Core(
@Nullable String sourceEntityType, @Nullable LocalRelationshipFilter sourceEntityFilter,
@Nullable String destinationEntityType, @Nullable LocalRelationshipFilter destinationEntityFilter,
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull LocalRelationshipFilter relationshipFilter,
int offset, int count) {
validateEntityTypeAndFilter(sourceEntityFilter, sourceEntityType);
validateEntityTypeAndFilter(destinationEntityFilter, destinationEntityType);
validateRelationshipFilter(relationshipFilter);
Expand All @@ -237,11 +269,86 @@ public <RELATIONSHIP extends RecordTemplate> List<RELATIONSHIP> findRelationship
destTableName, destinationEntityFilter,
count, offset);

return _server.createSqlQuery(sql).findList().stream()
.map(row -> RecordUtils.toRecordTemplate(relationshipType, row.getString("metadata")))
return _server.createSqlQuery(sql).findList();
}

/**
* Finds a list of relationships of a specific type (Urn) based on the given filters if applicable.
* Similar to findRelationshipsV2, but this method wraps the relationship in a specific class provided by user.
* The intended use case is for MG internally with AssetRelationship, but since it is an open API, we are leaving room for extendability.
*
* @param sourceEntityType type of source entity to query (e.g. "dataset")
* @param sourceEntityFilter the filter to apply to the source entity when querying (not applicable to non-MG entities)
* @param destinationEntityType type of destination entity to query (e.g. "dataset")
* @param destinationEntityFilter the filter to apply to the destination entity when querying (not applicable to non-MG entities)
* @param relationshipType the type of relationship to query
* @param relationshipFilter the filter to apply to relationship when querying
* @param assetRelationshipClass the wrapper class for the relationship type
* @param wrapOptions options to wrap the relationship. Currently unused. Leaving it open for the future.
* @param offset the offset query should start at. Ignored if set to a negative value.
* @param count the maximum number of entities to return. Ignored if set to a non-positive value.
* @return A list of relationship records.
*/
@Nonnull
public <ASSET_RELATIONSHIP extends RecordTemplate, RELATIONSHIP extends RecordTemplate> List<ASSET_RELATIONSHIP> findRelationshipsV3(
@Nullable String sourceEntityType, @Nullable LocalRelationshipFilter sourceEntityFilter,
@Nullable String destinationEntityType, @Nullable LocalRelationshipFilter destinationEntityFilter,
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull LocalRelationshipFilter relationshipFilter,
@Nonnull Class<ASSET_RELATIONSHIP> assetRelationshipClass, @Nullable Map<String, Object> wrapOptions,
int offset, int count) {
if (wrapOptions == null) {
throw new IllegalArgumentException("Please check your use of the findRelationshipsV3 method. wrapOptions cannot be null.");
}

List<SqlRow> sqlRows = findRelationshipsV2V3Core(
sourceEntityType, sourceEntityFilter, destinationEntityType, destinationEntityFilter,
relationshipType, relationshipFilter, offset, count);

return sqlRows.stream()
.map(row -> createAssetRelationshipWrapperForRelationship(
relationshipType, assetRelationshipClass, row.getString(METADATA), row.getString(SOURCE), wrapOptions))
.collect(Collectors.toList());
}

/**
* Wraps the relationship in a specific class provided by user.
* The intended use case is for MG internally with AssetRelationship, but since it is an open API, we are leaving room for extendability.
*
* @param relationshipType the type of relationship to query
* @param assetRelationshipClass the wrapper class for the relationship type. By default, AssetRelationship.
* @param metadata the metadata string which can be parsed into a relationship
* @param sourceUrn the source urn
* @param wrapOptions options to wrap the relationship. Currently unused. Leaving it open for the future.
* @return A wrapped relationship record.
*/
@Nonnull
private <ASSET_RELATIONSHIP extends RecordTemplate, RELATIONSHIP extends RecordTemplate> ASSET_RELATIONSHIP createAssetRelationshipWrapperForRelationship(
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull Class<ASSET_RELATIONSHIP> assetRelationshipClass,
@Nonnull String metadata, @Nonnull String sourceUrn, @Nullable Map<String, Object> wrapOptions) {
// TODO: if other type of ASSET_RELATIONSHIP is needed, we need to distinguish it with wrapOptions and handles differently.

// parse metadata json string into DataMap
final DataMap relationshipDataMap = RecordUtils.toDataMap(metadata);

final DataMap relatedToDataMap = new DataMap();
// e.g. "BelongsToV2" -> "belongsToV2"
final String relationshipName = decapitalize(relationshipType.getSimpleName());
relatedToDataMap.put(relationshipName, relationshipDataMap);

final DataMap dataMap = new DataMap();
dataMap.put(RELATED_TO, relatedToDataMap);
dataMap.put(SOURCE, sourceUrn);

return RecordUtils.toRecordTemplate(assetRelationshipClass, dataMap);
}

private static String decapitalize(String str) {
if (str == null || str.isEmpty()) {
return str;
}
return str.substring(0, 1).toLowerCase() + str.substring(1);
}

/**
* Checks if a given entity type has an entity table in the db.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import com.linkedin.testing.EntityAspectUnion;
import com.linkedin.testing.EntityAspectUnionArray;
import com.linkedin.testing.FooSnapshot;
import com.linkedin.testing.localrelationship.AssetRelationship;
import com.linkedin.testing.localrelationship.BelongsTo;
import com.linkedin.testing.localrelationship.BelongsToV2;
import com.linkedin.testing.localrelationship.ConsumeFrom;
import com.linkedin.testing.localrelationship.EnvorinmentType;
import com.linkedin.testing.localrelationship.OwnedBy;
Expand All @@ -49,6 +51,7 @@
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -694,6 +697,100 @@ ReportsTo.class, new LocalRelationshipFilter().setCriteria(new LocalRelationship
assertEquals(actual, expected);
}

@Test(dataProvider = "schemaConfig")
public void testFindRelationshipsV3WithRelationshipV1(EbeanLocalDAO.SchemaConfig schemaConfig) throws URISyntaxException {
FooUrn alice = new FooUrn(1);
FooUrn bob = new FooUrn(2);

// Add Alice, Bob and Jack into entity tables.
if (schemaConfig == EbeanLocalDAO.SchemaConfig.NEW_SCHEMA_ONLY) {
_fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null, false);
_fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null, false);
}

// Add Bob reports-to ALice relationship
ReportsTo bobReportsToAlice = new ReportsTo().setSource(bob).setDestination(alice);
_localRelationshipWriterDAO.addRelationships(bob, AspectFoo.class, Collections.singletonList(bobReportsToAlice), false);

// Find all reports-to relationship for Alice.
LocalRelationshipFilter destFilter;
if (schemaConfig == EbeanLocalDAO.SchemaConfig.OLD_SCHEMA_ONLY) {
// old schema does not support non-urn field filters
LocalRelationshipCriterion oldSchemaFilterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create(alice.toString()),
Condition.EQUAL,
new UrnField());
destFilter = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray(oldSchemaFilterCriterion));
} else {
LocalRelationshipCriterion filterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create("Alice"),
Condition.EQUAL,
new AspectField().setAspect(AspectFoo.class.getCanonicalName()).setPath("/value"));
destFilter = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray(filterCriterion));
}

_localRelationshipQueryDAO.setSchemaConfig(schemaConfig);

List<AssetRelationship> reportsToAlice = _localRelationshipQueryDAO.findRelationshipsV3(
null, null, "foo", destFilter,
ReportsTo.class, new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray()).setDirection(RelationshipDirection.UNDIRECTED),
AssetRelationship.class, new HashMap<>(),
-1, -1);

AssetRelationship expected = reportsToAlice.get(0);
assertEquals(expected.getSource(), "urn:li:foo:2");

ReportsTo expectedReportsTo = expected.getRelatedTo().getReportsTo();

assertNotNull(expectedReportsTo);
assertEquals(expectedReportsTo.getSource().toString(), "urn:li:foo:2");
assertEquals(expectedReportsTo.getDestination().toString(), "urn:li:foo:1");
}

@Test(dataProvider = "schemaConfig")
public void testFindRelationshipsV3WithRelationshipV2(EbeanLocalDAO.SchemaConfig schemaConfig) throws URISyntaxException {
FooUrn owner = new FooUrn(1);
FooUrn car = new FooUrn(2);

// Add Alice, Bob and Jack into entity tables.
if (schemaConfig == EbeanLocalDAO.SchemaConfig.NEW_SCHEMA_ONLY) {
_fooUrnEBeanLocalAccess.add(car, new AspectFoo().setValue("Car"), AspectFoo.class, new AuditStamp(), null, false);
_fooUrnEBeanLocalAccess.add(owner, new AspectFoo().setValue("Owner"), AspectFoo.class, new AuditStamp(), null, false);
}

// Add Bob reports-to ALice relationship
BelongsToV2 carBelongsToOwner = new BelongsToV2();
carBelongsToOwner.setDestination(BelongsToV2.Destination.create(owner.toString()));
_localRelationshipWriterDAO.addRelationships(car, AspectFoo.class, Collections.singletonList(carBelongsToOwner), false);

// Find all reports-to relationship for Alice.
LocalRelationshipFilter destFilter;
if (schemaConfig == EbeanLocalDAO.SchemaConfig.OLD_SCHEMA_ONLY) {
// old schema does not support non-urn field filters
LocalRelationshipCriterion oldSchemaFilterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create(owner.toString()),
Condition.EQUAL,
new UrnField());
destFilter = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray(oldSchemaFilterCriterion));
} else {
LocalRelationshipCriterion filterCriterion = EBeanDAOUtils.buildRelationshipFieldCriterion(LocalRelationshipValue.create("Owner"),
Condition.EQUAL,
new AspectField().setAspect(AspectFoo.class.getCanonicalName()).setPath("/value"));
destFilter = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray(filterCriterion));
}

_localRelationshipQueryDAO.setSchemaConfig(schemaConfig);

List<AssetRelationship> belongsToOwner = _localRelationshipQueryDAO.findRelationshipsV3(
null, null, "foo", destFilter,
BelongsToV2.class, new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray()).setDirection(RelationshipDirection.UNDIRECTED),
AssetRelationship.class, new HashMap<>(),
-1, -1);

AssetRelationship expected = belongsToOwner.get(0);
assertEquals(expected.getSource(), "urn:li:foo:2");

BelongsToV2 expectedBelongsToV2 = expected.getRelatedTo().getBelongsToV2();
assertEquals(expectedBelongsToV2.getDestination().getString(), owner.toString());
}

@Test
public void testIsMgEntityType() throws Exception {
// EbeanLocalRelationshipQueryDAOTest does not have the same package as EbeanLocalRelationshipQueryDAO (cant access protected method directly).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
DROP TABLE IF EXISTS metadata_relationship_belongsto;
DROP TABLE IF EXISTS metadata_relationship_belongstov2;
DROP TABLE IF EXISTS metadata_relationship_reportsto;
DROP TABLE IF EXISTS metadata_relationship_ownedby;
DROP TABLE IF EXISTS metadata_relationship_pairswith;
Expand All @@ -22,6 +23,20 @@ CREATE TABLE IF NOT EXISTS metadata_relationship_belongsto (
PRIMARY KEY (id)
);

CREATE TABLE IF NOT EXISTS metadata_relationship_belongstov2 (
id BIGINT NOT NULL AUTO_INCREMENT,
metadata LONGTEXT NOT NULL,
source VARCHAR(1000) NOT NULL,
source_type VARCHAR(100) NOT NULL,
destination VARCHAR(1000) NOT NULL,
destination_type VARCHAR(100) NOT NULL,
lastmodifiedon TIMESTAMP NOT NULL,
lastmodifiedby VARCHAR(255) NOT NULL,
deleted_ts DATETIME(6) DEFAULT NULL,
aspect VARCHAR(200) DEFAULT NULL, -- should be NOT NULL in production use cases
PRIMARY KEY (id)
);

CREATE TABLE IF NOT EXISTS metadata_relationship_reportsto (
id BIGINT NOT NULL AUTO_INCREMENT,
metadata JSON NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
DROP TABLE IF EXISTS metadata_relationship_belongsto;
DROP TABLE IF EXISTS metadata_relationship_belongstov2;
DROP TABLE IF EXISTS metadata_relationship_reportsto;
DROP TABLE IF EXISTS metadata_relationship_ownedby;
DROP TABLE IF EXISTS metadata_relationship_pairswith;
Expand All @@ -22,6 +23,20 @@ CREATE TABLE IF NOT EXISTS metadata_relationship_belongsto (
PRIMARY KEY (id)
);

CREATE TABLE IF NOT EXISTS metadata_relationship_belongstov2 (
id BIGINT NOT NULL AUTO_INCREMENT,
metadata LONGTEXT NOT NULL,
source VARCHAR(1000) NOT NULL,
source_type VARCHAR(100) NOT NULL,
destination VARCHAR(1000) NOT NULL,
destination_type VARCHAR(100) NOT NULL,
lastmodifiedon TIMESTAMP NOT NULL,
lastmodifiedby VARCHAR(255) NOT NULL,
deleted_ts DATETIME(6) DEFAULT NULL,
aspect VARCHAR(200) DEFAULT NULL, -- should be NOT NULL in production use cases
PRIMARY KEY (id)
);

CREATE TABLE IF NOT EXISTS metadata_relationship_reportsto (
id BIGINT NOT NULL AUTO_INCREMENT,
metadata JSON NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace com.linkedin.testing.localrelationship


/**
* A union of all supported relationship types.
*/
record AssetRelationship {

/**
* Source asset urn in the relationship
*/
source: optional string = ""

/**
* The union of specific relationship models, which contains the destination entity reference
* and other relationship specific attributes, such as lineageType in DownstreamOf relationship
*/
relatedTo: optional union[
reportsTo: ReportsTo,
belongsToV2: BelongsToV2
]
}

0 comments on commit edc45e1

Please sign in to comment.