Skip to content

Commit

Permalink
Stale removal via timestamp (#235)
Browse files Browse the repository at this point in the history
* Neo4jStalenessRemovalTask to support removal via timestamp

* Flake8

* Update

* More unit test

* Update

* Flake 8

* Update

* Add doc
  • Loading branch information
jinhyukchang authored Mar 31, 2020
1 parent 2ee56a6 commit 526cf08
Show file tree
Hide file tree
Showing 4 changed files with 468 additions and 44 deletions.
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,66 @@ With this pattern RestApiQuery supports 1:1 and 1:N JOIN relationship.
(GROUP BY or any other aggregation, sub-query join is not supported)

To see in action, take a peek at [ModeDashboardExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/dashboard/mode_dashboard_extractor.py)


### Removing stale data in Neo4j -- [Neo4jStalenessRemovalTask](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/task/neo4j_staleness_removal_task.py):

As Databuilder ingestion mostly consists of either INSERT OR UPDATE, there could be some stale data that has been removed from metadata source but still remains in Neo4j database. Neo4jStalenessRemovalTask basically detects staleness and removes it.

In [Neo4jCsvPublisher](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/publisher/neo4j_csv_publisher.py), it adds attributes "published_tag" and "publisher_last_updated_epoch_ms" on every nodes and relations. You can use either of these two attributes to detect staleness and remove those stale node or relation from the database.

#### Using "published_tag" to remove stale data
Use *published_tag* to remove stale data, when it is certain that non-matching tag is stale once all the ingestion is completed. For example, suppose that you use current date (or execution date in Airflow) as a *published_tag*, "2020-03-31". Once Databuilder ingests all tables and all columns, all table nodes and column nodes should have *published_tag* as "2020-03-31". It is safe to assume that table nodes and column nodes whose *published_tag* is different -- such as "2020-03-30" or "2020-02-10" -- means that it is deleted from the source metadata. You can use Neo4jStalenessRemovalTask to delete those stale data.

task = Neo4jStalenessRemovalTask()
job_config_dict = {
'job.identifier': 'remove_stale_data_job',
'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint,
'task.remove_stale_data.neo4j_user': neo4j_user,
'task.remove_stale_data.neo4j_password': neo4j_password,
'task.remove_stale_data.staleness_max_pct': 10,
'task.remove_stale_data.target_nodes': ['Table', 'Column'],
'task.remove_stale_data.job_publish_tag': '2020-03-31'
}
job_config = ConfigFactory.from_dict(job_config_dict)
job = DefaultJob(conf=job_config, task=task)
job.launch()

Note that there's protection mechanism, **staleness_max_pct**, that protect your data being wiped out when something is clearly wrong. "**staleness_max_pct**" basically first measure the proportion of elements that will be deleted and if it exceeds threshold per type ( 10% on the configuration above ), the deletion won't be executed and the task aborts.

#### Using "publisher_last_updated_epoch_ms" to remove stale data
You can think this approach as TTL based eviction. This is particularly useful when there are multiple ingestion pipelines and you cannot be sure when all ingestion is done. In this case, you might still can say that if specific node or relation has not been published past 3 days, it's stale data.

task = Neo4jStalenessRemovalTask()
job_config_dict = {
'job.identifier': 'remove_stale_data_job',
'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint,
'task.remove_stale_data.neo4j_user': neo4j_user,
'task.remove_stale_data.neo4j_password': neo4j_password,
'task.remove_stale_data.staleness_max_pct': 10,
'task.remove_stale_data.target_relations': ['READ', 'READ_BY'],
'task.remove_stale_data.milliseconds_to_expire': 86400000 * 3
}
job_config = ConfigFactory.from_dict(job_config_dict)
job = DefaultJob(conf=job_config, task=task)
job.launch()

Above configuration is trying to delete stale usage relation (READ, READ_BY), by deleting READ or READ_BY relation that has not been published past 3 days. If number of elements to be removed is more than 10% per type, this task will be aborted without executing any deletion.

#### Dry run
Deletion is always scary and it's better to perform dryrun before put this into action. You can use Dry run to see what sort of Cypher query will be executed.

task = Neo4jStalenessRemovalTask()
job_config_dict = {
'job.identifier': 'remove_stale_data_job',
'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint,
'task.remove_stale_data.neo4j_user': neo4j_user,
'task.remove_stale_data.neo4j_password': neo4j_password,
'task.remove_stale_data.staleness_max_pct': 10,
'task.remove_stale_data.target_relations': ['READ', 'READ_BY'],
'task.remove_stale_data.milliseconds_to_expire': 86400000 * 3
'task.remove_stale_data.dry_run': True
}
job_config = ConfigFactory.from_dict(job_config_dict)
job = DefaultJob(conf=job_config, task=task)
job.launch()
127 changes: 84 additions & 43 deletions databuilder/task/neo4j_staleness_removal_task.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import logging
import textwrap
import time

from neo4j.v1 import GraphDatabase, BoltStatementResult # noqa: F401
from neo4j import GraphDatabase # noqa: F401
from pyhocon import ConfigFactory # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from typing import Dict, Iterable, Any # noqa: F401

from databuilder import Scoped
from databuilder.task.base_task import Task # noqa: F401
from databuilder.publisher.neo4j_csv_publisher import JOB_PUBLISH_TAG

from databuilder.task.base_task import Task # noqa: F401

# A end point for Neo4j e.g: bolt://localhost:9999
NEO4J_END_POINT_KEY = 'neo4j_endpoint'
Expand All @@ -20,20 +20,28 @@
TARGET_NODES = "target_nodes"
TARGET_RELATIONS = "target_relations"
BATCH_SIZE = "batch_size"
DRY_RUN = "dry_run"
# Staleness max percentage. Safety net to prevent majority of data being deleted.
STALENESS_MAX_PCT = "staleness_max_pct"
# Staleness max percentage per LABEL/TYPE. Safety net to prevent majority of data being deleted.
STALENESS_PCT_MAX_DICT = "staleness_max_pct_dict"
# Using this milliseconds and published timestamp to determine staleness
MS_TO_EXPIRE = "milliseconds_to_expire"
MIN_MS_TO_EXPIRE = "minimum_milliseconds_to_expire"

DEFAULT_CONFIG = ConfigFactory.from_dict({BATCH_SIZE: 100,
NEO4J_MAX_CONN_LIFE_TIME_SEC: 50,
STALENESS_MAX_PCT: 5,
TARGET_NODES: [],
TARGET_RELATIONS: [],
STALENESS_PCT_MAX_DICT: {}})
STALENESS_PCT_MAX_DICT: {},
MIN_MS_TO_EXPIRE: 86400000,
DRY_RUN: False})

LOGGER = logging.getLogger(__name__)

MARKER_VAR_NAME = 'marker'


class Neo4jStalenessRemovalTask(Task):
"""
Expand All @@ -55,22 +63,33 @@ def get_scope(self):

def init(self, conf):
# type: (ConfigTree) -> None
conf = Scoped.get_scoped_conf(conf, self.get_scope())\
.with_fallback(conf)\
conf = Scoped.get_scoped_conf(conf, self.get_scope()) \
.with_fallback(conf) \
.with_fallback(DEFAULT_CONFIG)
self.target_nodes = set(conf.get_list(TARGET_NODES))
self.target_relations = set(conf.get_list(TARGET_RELATIONS))
self.batch_size = conf.get_int(BATCH_SIZE)
self.dry_run = conf.get_bool(DRY_RUN)
self.staleness_pct = conf.get_int(STALENESS_MAX_PCT)
self.staleness_pct_dict = conf.get(STALENESS_PCT_MAX_DICT)
self.publish_tag = conf.get_string(JOB_PUBLISH_TAG)

if JOB_PUBLISH_TAG in conf and MS_TO_EXPIRE in conf:
raise Exception('Cannot have both {} and {} in job config'.format(JOB_PUBLISH_TAG, MS_TO_EXPIRE))

self.ms_to_expire = None
if MS_TO_EXPIRE in conf:
self.ms_to_expire = conf.get_int(MS_TO_EXPIRE)
if self.ms_to_expire < conf.get_int(MIN_MS_TO_EXPIRE):
raise Exception('{} is too small'.format(MS_TO_EXPIRE))
self.marker = '(timestamp() - {})'.format(conf.get_int(MS_TO_EXPIRE))
else:
self.marker = conf.get_string(JOB_PUBLISH_TAG)

self._driver = \
GraphDatabase.driver(conf.get_string(NEO4J_END_POINT_KEY),
max_connection_life_time=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC),
auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)))

self._session = self._driver.session()

def run(self):
# type: () -> None
"""
Expand All @@ -94,26 +113,39 @@ def validate(self):
self._validate_relation_staleness_pct()

def _delete_stale_nodes(self):
statement = """
MATCH (n:{type})
WHERE n.published_tag <> $published_tag
OR NOT EXISTS(n.published_tag)
statement = textwrap.dedent("""
MATCH (n:{{type}})
WHERE {}
WITH n LIMIT $batch_size
DETACH DELETE (n)
RETURN COUNT(*) as count;
""")
self._batch_delete(statement=self._decorate_staleness(statement), targets=self.target_nodes)

def _decorate_staleness(self, statement):
"""
Append where clause to the Cypher statement depends on which field to be used to expire stale data.
:param statement:
:return:
"""
self._batch_delete(statement=statement, targets=self.target_nodes)
if self.ms_to_expire:
return statement.format(textwrap.dedent("""
n.publisher_last_updated_epoch_ms < ${marker}
OR NOT EXISTS(n.publisher_last_updated_epoch_ms)""".format(marker=MARKER_VAR_NAME)))

return statement.format(textwrap.dedent("""
n.published_tag <> ${marker}
OR NOT EXISTS(n.published_tag)""".format(marker=MARKER_VAR_NAME)))

def _delete_stale_relations(self):
statement = """
MATCH ()-[r:{type}]-()
WHERE r.published_tag <> $published_tag
OR NOT EXISTS(r.published_tag)
WITH r LIMIT $batch_size
DELETE r
statement = textwrap.dedent("""
MATCH ()-[n:{{type}}]-()
WHERE {}
WITH n LIMIT $batch_size
DELETE n
RETURN count(*) as count;
"""
self._batch_delete(statement=statement, targets=self.target_relations)
""")
self._batch_delete(statement=self._decorate_staleness(statement), targets=self.target_relations)

def _batch_delete(self, statement, targets):
"""
Expand All @@ -126,10 +158,12 @@ def _batch_delete(self, statement, targets):
LOGGER.info('Deleting stale data of {} with batch size {}'.format(t, self.batch_size))
total_count = 0
while True:
result = self._execute_cypher_query(statement=statement.format(type=t),
param_dict={'batch_size': self.batch_size,
'published_tag': self.publish_tag}).single()
count = result['count']
results = self._execute_cypher_query(statement=statement.format(type=t),
param_dict={'batch_size': self.batch_size,
MARKER_VAR_NAME: self.marker},
dry_run=self.dry_run)
record = next(iter(results), None)
count = record['count'] if record else 0
total_count = total_count + count
if count == 0:
break
Expand Down Expand Up @@ -160,52 +194,59 @@ def _validate_staleness_pct(self, total_records, stale_records, types):
def _validate_node_staleness_pct(self):
# type: () -> None

total_nodes_statement = """
total_nodes_statement = textwrap.dedent("""
MATCH (n)
WITH DISTINCT labels(n) as node, count(*) as count
RETURN head(node) as type, count
"""
""")

stale_nodes_statement = """
stale_nodes_statement = textwrap.dedent("""
MATCH (n)
WHERE n.published_tag <> $published_tag
OR NOT EXISTS(n.published_tag)
WHERE {}
WITH DISTINCT labels(n) as node, count(*) as count
RETURN head(node) as type, count
"""
""")

stale_nodes_statement = textwrap.dedent(self._decorate_staleness(stale_nodes_statement))

total_records = self._execute_cypher_query(statement=total_nodes_statement)
stale_records = self._execute_cypher_query(statement=stale_nodes_statement,
param_dict={'published_tag': self.publish_tag})
param_dict={MARKER_VAR_NAME: self.marker})
self._validate_staleness_pct(total_records=total_records,
stale_records=stale_records,
types=self.target_nodes)

def _validate_relation_staleness_pct(self):
# type: () -> None
total_relations_statement = """
total_relations_statement = textwrap.dedent("""
MATCH ()-[r]-()
RETURN type(r) as type, count(*) as count;
"""
""")

stale_relations_statement = """
MATCH ()-[r]-()
WHERE r.published_tag <> $published_tag
OR NOT EXISTS(r.published_tag)
RETURN type(r) as type, count(*) as count
"""
stale_relations_statement = textwrap.dedent("""
MATCH ()-[n]-()
WHERE {}
RETURN type(n) as type, count(*) as count
""")

stale_relations_statement = textwrap.dedent(self._decorate_staleness(stale_relations_statement))

total_records = self._execute_cypher_query(statement=total_relations_statement)
stale_records = self._execute_cypher_query(statement=stale_relations_statement,
param_dict={'published_tag': self.publish_tag})
param_dict={MARKER_VAR_NAME: self.marker})
self._validate_staleness_pct(total_records=total_records,
stale_records=stale_records,
types=self.target_relations)

def _execute_cypher_query(self, statement, param_dict={}):
def _execute_cypher_query(self, statement, param_dict={}, dry_run=False):
# type: (str, Dict[str, Any]) -> Iterable[Dict[str, Any]]
LOGGER.info('Executing Cypher query: {statement} with params {params}: '.format(statement=statement,
params=param_dict))

if dry_run:
LOGGER.info('Skipping for it is a dryrun')
return []

start = time.time()
try:
with self._driver.session() as session:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools import setup, find_packages


__version__ = '2.4.3'
__version__ = '2.5.0'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file:
Expand Down
Loading

0 comments on commit 526cf08

Please sign in to comment.