Skip to content

Commit

Permalink
Refactor Neo4jProxy table owners query for easier customization (#2182)
Browse files Browse the repository at this point in the history
* refactor _exec_owners_query and get_table

Signed-off-by: Ben Dye <[email protected]>

* remove prior owners logic and references

Signed-off-by: Ben Dye <[email protected]>

* WIP

Signed-off-by: Ben Dye <[email protected]>

* update tests

Signed-off-by: Ben Dye <[email protected]>

* Add comment explaining testing-mock logic

Signed-off-by: Ben Dye <[email protected]>

* Cleanups

Signed-off-by: Ben Dye <[email protected]>

* Bump version

Signed-off-by: Ben Dye <[email protected]>

---------

Signed-off-by: Ben Dye <[email protected]>
  • Loading branch information
B-T-D authored Aug 11, 2023
1 parent 00377a3 commit 8c3ceb7
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 21 deletions.
39 changes: 26 additions & 13 deletions metadata/metadata_service/proxy/neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ def get_table(self, *, table_uri: str) -> Table:
cols, last_neo4j_record = self._exec_col_query(table_uri)

readers = self._exec_usage_query(table_uri)
owners = self._exec_owners_query(table_uri)

wmk_results, table_writer, table_apps, timestamp_value, owners, tags, source, \
wmk_results, table_writer, table_apps, timestamp_value, tags, source, \
badges, prog_descs, resource_reports = self._exec_table_query(table_uri)

joins, filters = self._exec_table_query_query(table_uri)
Expand Down Expand Up @@ -340,22 +341,41 @@ def _exec_usage_query(self, table_uri: str) -> List[Reader]:

return readers

@timer_with_counter
def _exec_owners_query(self, table_uri: str) -> List[User]:
# Return Value: List[User]
owners_query = textwrap.dedent("""
MATCH (owner:User)<-[:OWNER]-(tbl:Table {key: $tbl_key})
RETURN collect(distinct owner) as owner_records
""")
owners_neo4j_records = self._execute_cypher_query(statement=owners_query,
param_dict={'tbl_key': table_uri})

owners_neo4j_records = get_single_record(owners_neo4j_records)

owners = [] # type: List[User]
for owner_neo4j_record in owners_neo4j_records.get('owner_records', []):
owner_data = self._get_user_details(user_id=owner_neo4j_record['email'])
owner = self._build_user_from_record(record=owner_data)
owners.append(owner)

return owners

@timer_with_counter
def _exec_table_query(self, table_uri: str) -> Tuple:
"""
Queries one Cypher record with watermark list, Application,
,timestamp, owner records and tag records.
,timestamp, and tag records.
"""

# Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, owner records, tag records)
# Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, tag records)

table_level_query = textwrap.dedent("""\
MATCH (tbl:Table {key: $tbl_key})
OPTIONAL MATCH (wmk:Watermark)-[:BELONG_TO_TABLE]->(tbl)
OPTIONAL MATCH (app_producer:Application)-[:GENERATES]->(tbl)
OPTIONAL MATCH (app_consumer:Application)-[:CONSUMES]->(tbl)
OPTIONAL MATCH (tbl)-[:LAST_UPDATED_AT]->(t:Timestamp)
OPTIONAL MATCH (owner:User)<-[:OWNER]-(tbl)
OPTIONAL MATCH (tbl)-[:TAGGED_BY]->(tag:Tag{tag_type: $tag_normal_type})
OPTIONAL MATCH (tbl)-[:HAS_BADGE]->(badge:Badge)
OPTIONAL MATCH (tbl)-[:SOURCE]->(src:Source)
Expand All @@ -365,7 +385,6 @@ def _exec_table_query(self, table_uri: str) -> Tuple:
collect(distinct app_producer) as producing_apps,
collect(distinct app_consumer) as consuming_apps,
t.last_updated_timestamp as last_updated_timestamp,
collect(distinct owner) as owner_records,
collect(distinct tag) as tag_records,
collect(distinct badge) as badge_records,
src,
Expand Down Expand Up @@ -405,12 +424,6 @@ def _exec_table_query(self, table_uri: str) -> Tuple:

timestamp_value = table_records['last_updated_timestamp']

owner_record = []

for owner in table_records.get('owner_records', []):
owner_data = self._get_user_details(user_id=owner['email'])
owner_record.append(self._build_user_from_record(record=owner_data))

src = None

if table_records['src']:
Expand All @@ -423,7 +436,7 @@ def _exec_table_query(self, table_uri: str) -> Tuple:

resource_reports = self._extract_resource_reports_from_query(table_records.get('resource_reports', []))

return wmk_results, table_writer, table_apps, timestamp_value, owner_record,\
return wmk_results, table_writer, table_apps, timestamp_value,\
tags, src, badges, prog_descriptions, resource_reports

@timer_with_counter
Expand All @@ -434,7 +447,7 @@ def _exec_table_query_query(self, table_uri: str) -> Tuple:
on the table.
"""

# Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, owner records, tag records)
# Return Value: (Watermark Results, Table Writer, Last Updated Timestamp, tag records)
table_query_level_query = textwrap.dedent("""
MATCH (tbl:Table {key: $tbl_key})
OPTIONAL MATCH (tbl)-[:COLUMN]->(col:Column)-[COLUMN_JOINS_WITH]->(j:Join)
Expand Down
2 changes: 1 addition & 1 deletion metadata/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

__version__ = '3.12.1'
__version__ = '3.12.2'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file:
Expand Down
20 changes: 13 additions & 7 deletions metadata/tests/unit/proxy/test_neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,6 @@ def setUp(self) -> None:
}
],
'last_updated_timestamp': 1,
'owner_records': [
{
'key': '[email protected]',
'email': '[email protected]',
'updated_at': 0,
}
],
'tag_records': [
{
'key': 'test',
Expand Down Expand Up @@ -236,6 +229,14 @@ def setUp(self) -> None:
]
}]

owners_results = [{'owner_records': [
{
'key': '[email protected]',
'email': '[email protected]',
'updated_at': 0,
}
], }]

last_updated_timestamp = '01'

self.col_usage_return_value = [
Expand All @@ -250,6 +251,8 @@ def setUp(self) -> None:

self.table_common_usage = table_common_usage

self.owners_return_value = owners_results

self.col_bar_id_1_expected_type_metadata = self._get_col_bar_id_1_expected_type_metadata()
self.col_bar_id_2_expected_type_metadata = self._get_col_bar_id_2_expected_type_metadata()

Expand Down Expand Up @@ -355,9 +358,11 @@ def test_health_neo4j(self) -> None:

def test_get_table(self) -> None:
with patch.object(GraphDatabase, 'driver'), patch.object(Neo4jProxy, '_execute_cypher_query') as mock_execute:
# mock database return values such that we match ordering of queries executed in Neo4jProxy.get_table
mock_execute.side_effect = [
self.col_usage_return_value,
[],
self.owners_return_value,
self.table_level_return_value,
self.table_common_usage,
[]
Expand Down Expand Up @@ -445,6 +450,7 @@ def test_get_table_view_only(self) -> None:
mock_execute.side_effect = [
col_usage_return_value,
[],
self.owners_return_value,
self.table_level_return_value,
self.table_common_usage,
[]
Expand Down

0 comments on commit 8c3ceb7

Please sign in to comment.