From 3b89a78f2db4e4bad39a79434da2b9f8cab1f6d4 Mon Sep 17 00:00:00 2001 From: Jin Hyuk Chang Date: Tue, 5 Mar 2019 10:52:29 -0800 Subject: [PATCH] [DPTOOLS-1903] Remove stale data in ES index (#15) --- databuilder/extractor/neo4j_extractor.py | 1 + .../extractor/neo4j_search_data_extractor.py | 24 +++++++++++++++++-- .../table_column_usage_aggregate_extractor.py | 1 - setup.py | 2 +- .../test_neo4j_search_data_extractor.py | 23 ++++++++++++++++++ 5 files changed, 47 insertions(+), 4 deletions(-) create mode 100644 tests/unit/extractor/test_neo4j_search_data_extractor.py diff --git a/databuilder/extractor/neo4j_extractor.py b/databuilder/extractor/neo4j_extractor.py index 9814e2c98..6fa9c6af0 100644 --- a/databuilder/extractor/neo4j_extractor.py +++ b/databuilder/extractor/neo4j_extractor.py @@ -63,6 +63,7 @@ def _execute_query(self, tx): """ Create an iterator to execute sql. """ + LOGGER.info('Executing query {}'.format(self.cypher_query)) result = tx.run(self.cypher_query) return result diff --git a/databuilder/extractor/neo4j_search_data_extractor.py b/databuilder/extractor/neo4j_search_data_extractor.py index 04973e101..1dd0ecd81 100644 --- a/databuilder/extractor/neo4j_search_data_extractor.py +++ b/databuilder/extractor/neo4j_search_data_extractor.py @@ -6,6 +6,7 @@ from databuilder import Scoped from databuilder.extractor.base_extractor import Extractor from databuilder.extractor.neo4j_extractor import Neo4jExtractor +from databuilder.publisher.neo4j_csv_publisher import JOB_PUBLISH_TAG class Neo4jSearchDataExtractor(Extractor): @@ -18,6 +19,7 @@ class Neo4jSearchDataExtractor(Extractor): DEFAULT_NEO4J_CYPHER_QUERY = textwrap.dedent( """ MATCH (db:Database)<-[:CLUSTER_OF]-(cluster:Cluster)<-[:SCHEMA_OF]-(schema:Schema)<-[:TABLE_OF]-(table:Table) + {publish_tag_filter} OPTIONAL MATCH (table)-[:DESCRIPTION]->(table_description:Description) OPTIONAL MATCH (table)-[read:READ_BY]->(user:User) OPTIONAL MATCH (table)-[:COLUMN]->(cols:Column) @@ -44,8 +46,11 @@ def init(self, conf): self.conf = conf # extract cypher query from conf, if specified, else use default query - self.cypher_query = conf.get_string(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY, - Neo4jSearchDataExtractor.DEFAULT_NEO4J_CYPHER_QUERY) + if Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY in conf: + self.cypher_query = conf.get_string(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY) + else: + self.cypher_query = self._add_publish_tag_filter(conf.get_string(JOB_PUBLISH_TAG, ''), + Neo4jSearchDataExtractor.DEFAULT_NEO4J_CYPHER_QUERY) self.neo4j_extractor = Neo4jExtractor() # write the cypher query in configs in Neo4jExtractor scope @@ -72,3 +77,18 @@ def extract(self): def get_scope(self): # type: () -> str return 'extractor.search_data' + + def _add_publish_tag_filter(self, publish_tag, cypher_query): + """ + Adds publish tag filter into Cypher query + :param publish_tag: value of publish tag. + :param cypher_query: + :return: + """ + # type: (str, str) -> str + if not publish_tag: + publish_tag_filter = '' + else: + publish_tag_filter = """WHERE table.published_tag = '{}'""".format(publish_tag) + + return cypher_query.format(publish_tag_filter=publish_tag_filter) diff --git a/databuilder/extractor/table_column_usage_aggregate_extractor.py b/databuilder/extractor/table_column_usage_aggregate_extractor.py index 096c8f6f5..0e62bc5c6 100644 --- a/databuilder/extractor/table_column_usage_aggregate_extractor.py +++ b/databuilder/extractor/table_column_usage_aggregate_extractor.py @@ -35,7 +35,6 @@ class TblColUsgAggExtractor(Extractor): def init(self, conf): # type: (ConfigTree) -> None - self._extractor = conf.get(RAW_EXTRACTOR) # type: Extractor self._extractor.init(Scoped.get_scoped_conf(conf, self._extractor.get_scope())) diff --git a/setup.py b/setup.py index eb36cd206..5911af018 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages -__version__ = '1.0.5' +__version__ = '1.0.6' setup( diff --git a/tests/unit/extractor/test_neo4j_search_data_extractor.py b/tests/unit/extractor/test_neo4j_search_data_extractor.py new file mode 100644 index 000000000..31f021181 --- /dev/null +++ b/tests/unit/extractor/test_neo4j_search_data_extractor.py @@ -0,0 +1,23 @@ +import unittest +from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor + + +class TestNeo4jExtractor(unittest.TestCase): + + def test_adding_filter(self): + # type: (Any) -> None + extractor = Neo4jSearchDataExtractor() + actual = extractor._add_publish_tag_filter('foo', 'MATCH (table:Table) {publish_tag_filter} RETURN table') + + self.assertEqual(actual, """MATCH (table:Table) WHERE table.published_tag = 'foo' RETURN table""") + + def test_not_adding_filter(self): + # type: (Any) -> None + extractor = Neo4jSearchDataExtractor() + actual = extractor._add_publish_tag_filter('', 'MATCH (table:Table) {publish_tag_filter} RETURN table') + + self.assertEqual(actual, """MATCH (table:Table) RETURN table""") + + +if __name__ == '__main__': + unittest.main()