Skip to content

Commit

Permalink
refactor: Compatibility with Neo4J 4.0.4 (4.x in general should work) (
Browse files Browse the repository at this point in the history
…#270)

* Compatibility with latest Neo4J 4.0.4

Starting with Neo4J v4, it no longer auto generates SSL/TLS
certificates if they aren't supplied. To keep the example
working, I needed config to specify whether the connection
to Neo4J should be encrypted or not.

It defaults to False, at the moment, so I think most
people would need to add an explicit True to their config.

* Satisfy warning and fix test failures.

The neo4j driver is now supposed to be imported using 'neo4j', the
older .v1 is now deprecated. I was going to leave this, but then
saw there are deprecation warnings.

Several unit tests were failing because they new 'neo4j_encrypted'
key wasn't included in the configs. I believe it makes sense to
include 'NEO4J_ENCRYPTED: True' as a default anyway. This should
stop people with existing configs from having to update them.

* Add a config parameter for SSL cert validation.
  • Loading branch information
davcamer authored Jun 4, 2020
1 parent 8f18faf commit 29bfcea
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 14 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,12 @@ An extractor that basically get current timestamp and passes it GenericExtractor
An extractor that extracts records from Neo4j based on provided [Cypher query](https://neo4j.com/developer/cypher/ "Cypher query"). One example is to extract data from Neo4j so that it can transform and publish to Elasticsearch.
```python
job_config = ConfigFactory.from_dict({
'extractor.neo4j.{}'.format(Neo4jExtractor.CYPHER_QUERY_CONFIG_KEY)': cypher_query,
'extractor.neo4j.{}'.format(Neo4jExtractor.CYPHER_QUERY_CONFIG_KEY): cypher_query,
'extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'package.module.class_name',
'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password})
'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password},
'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_ENCRYPTED): True})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
Expand All @@ -370,7 +371,8 @@ job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'databuilder.models.neo4j_data.Neo4jDataResult',
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password})
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password},
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_ENCRYPTED): False})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
Expand Down Expand Up @@ -444,6 +446,7 @@ job_config = ConfigFactory.from_dict({
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_ENCRYPTED): True,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_CREATE_ONLY_NODES): [DESCRIPTION_NODE_LABEL],
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG): job_publish_tag
})
Expand Down Expand Up @@ -698,7 +701,8 @@ job_config = ConfigFactory.from_dict({
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password,})
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_ENCRYPTED): True})

job = DefaultJob(
conf=job_config,
Expand Down
17 changes: 14 additions & 3 deletions databuilder/extractor/neo4j_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from typing import Any, Iterator, Union # noqa: F401

from pyhocon import ConfigTree, ConfigFactory # noqa: F401
from neo4j.v1 import GraphDatabase
from neo4j import GraphDatabase
import neo4j

from databuilder.extractor.base_extractor import Extractor

Expand All @@ -20,8 +21,14 @@ class Neo4jExtractor(Extractor):
NEO4J_AUTH_USER = 'neo4j_auth_user'
NEO4J_AUTH_PW = 'neo4j_auth_pw'
NEO4J_MAX_CONN_LIFE_TIME_SEC = 'neo4j_max_conn_life_time_sec'
NEO4J_ENCRYPTED = 'neo4j_encrypted'
"""NEO4J_ENCRYPTED is a boolean indicating whether to use SSL/TLS when connecting."""
NEO4J_VALIDATE_SSL = 'neo4j_validate_ssl'
"""NEO4J_VALIDATE_SSL is a boolean indicating whether to validate the server's SSL/TLS cert against system CAs."""

DEFAULT_CONFIG = ConfigFactory.from_dict({NEO4J_MAX_CONN_LIFE_TIME_SEC: 50, })
DEFAULT_CONFIG = ConfigFactory.from_dict({NEO4J_MAX_CONN_LIFE_TIME_SEC: 50,
NEO4J_ENCRYPTED: True,
NEO4J_VALIDATE_SSL: False})

def init(self, conf):
# type: (ConfigTree) -> None
Expand Down Expand Up @@ -57,11 +64,15 @@ def _get_driver(self):
"""
Create a Neo4j connection to Database
"""
trust = neo4j.TRUST_SYSTEM_CA_SIGNED_CERTIFICATES if self.conf.get_bool(Neo4jExtractor.NEO4J_VALIDATE_SSL) \
else neo4j.TRUST_ALL_CERTIFICATES
return GraphDatabase.driver(self.graph_url,
max_connection_life_time=self.conf.get_int(
Neo4jExtractor.NEO4J_MAX_CONN_LIFE_TIME_SEC),
auth=(self.conf.get_string(Neo4jExtractor.NEO4J_AUTH_USER),
self.conf.get_string(Neo4jExtractor.NEO4J_AUTH_PW)))
self.conf.get_string(Neo4jExtractor.NEO4J_AUTH_PW)),
encrypted=self.conf.get_bool(Neo4jExtractor.NEO4J_ENCRYPTED),
trust=trust)

def _execute_query(self, tx):
# type: (Any) -> Any
Expand Down
25 changes: 21 additions & 4 deletions databuilder/publisher/neo4j_csv_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from string import Template

import six
from neo4j.v1 import GraphDatabase, Transaction # noqa: F401
from neo4j import GraphDatabase, Transaction # noqa: F401
import neo4j
from neo4j.exceptions import CypherError
from pyhocon import ConfigFactory # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from typing import Set, List # noqa: F401
Expand Down Expand Up @@ -44,6 +46,10 @@

NEO4J_USER = 'neo4j_user'
NEO4J_PASSWORD = 'neo4j_password'
NEO4J_ENCRYPTED = 'neo4j_encrypted'
"""NEO4J_ENCRYPTED is a boolean indicating whether to use SSL/TLS when connecting."""
NEO4J_VALIDATE_SSL = 'neo4j_validate_ssl'
"""NEO4J_VALIDATE_SSL is a boolean indicating whether to validate the server's SSL/TLS cert against system CAs."""

# This will be used to provide unique tag to the node and relationship
JOB_PUBLISH_TAG = 'job_publish_tag'
Expand Down Expand Up @@ -88,6 +94,8 @@
NEO4J_PROGRESS_REPORT_FREQUENCY: 500,
NEO4J_RELATIONSHIP_CREATION_CONFIRM: False,
NEO4J_MAX_CONN_LIFE_TIME_SEC: 50,
NEO4J_ENCRYPTED: True,
NEO4J_VALIDATE_SSL: False,
RELATION_PREPROCESSOR: NoopRelationPreprocessor()})

NODE_MERGE_TEMPLATE = Template("""MERGE (node:$LABEL {key: '${KEY}'})
Expand Down Expand Up @@ -133,10 +141,14 @@ def init(self, conf):
self._relation_files = self._list_files(conf, RELATION_FILES_DIR)
self._relation_files_iter = iter(self._relation_files)

trust = neo4j.TRUST_SYSTEM_CA_SIGNED_CERTIFICATES if conf.get_bool(NEO4J_VALIDATE_SSL) \
else neo4j.TRUST_ALL_CERTIFICATES
self._driver = \
GraphDatabase.driver(conf.get_string(NEO4J_END_POINT_KEY),
max_connection_life_time=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC),
auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)))
auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)),
encrypted=conf.get_bool(NEO4J_ENCRYPTED),
trust=trust)
self._transaction_size = conf.get_int(NEO4J_TRANSCATION_SIZE)
self._session = self._driver.session()
self._confirm_rel_created = conf.get_bool(NEO4J_RELATIONSHIP_CREATION_CONFIRM)
Expand Down Expand Up @@ -451,12 +463,17 @@ def _try_create_index(self,
# type: (str) -> None
"""
For any label seen first time for this publisher it will try to create unique index.
There's no side effect on Neo4j side issuing index creation for existing index.
Neo4j ignores a second creation in 3.x, but raises an error in 4.x.
:param label:
:return:
"""
stmt = CREATE_UNIQUE_INDEX_TEMPLATE.substitute(LABEL=label)
LOGGER.info('Trying to create index for label {label} if not exist: {stmt}'.format(label=label,
stmt=stmt))
with self._driver.session() as session:
session.run(stmt)
try:
session.run(stmt)
except CypherError as e:
if 'An equivalent constraint already exists' not in e.__str__():
raise
# Else, swallow the exception, to make this function idempotent.
13 changes: 12 additions & 1 deletion databuilder/task/neo4j_staleness_removal_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time

from neo4j import GraphDatabase # noqa: F401
import neo4j
from pyhocon import ConfigFactory # noqa: F401
from pyhocon import ConfigTree # noqa: F401
from typing import Dict, Iterable, Any # noqa: F401
Expand All @@ -16,6 +17,10 @@
NEO4J_MAX_CONN_LIFE_TIME_SEC = 'neo4j_max_conn_life_time_sec'
NEO4J_USER = 'neo4j_user'
NEO4J_PASSWORD = 'neo4j_password'
NEO4J_ENCRYPTED = 'neo4j_encrypted'
"""NEO4J_ENCRYPTED is a boolean indicating whether to use SSL/TLS when connecting."""
NEO4J_VALIDATE_SSL = 'neo4j_validate_ssl'
"""NEO4J_VALIDATE_SSL is a boolean indicating whether to validate the server's SSL/TLS cert against system CAs."""

TARGET_NODES = "target_nodes"
TARGET_RELATIONS = "target_relations"
Expand All @@ -31,6 +36,8 @@

DEFAULT_CONFIG = ConfigFactory.from_dict({BATCH_SIZE: 100,
NEO4J_MAX_CONN_LIFE_TIME_SEC: 50,
NEO4J_ENCRYPTED: True,
NEO4J_VALIDATE_SSL: False,
STALENESS_MAX_PCT: 5,
TARGET_NODES: [],
TARGET_RELATIONS: [],
Expand Down Expand Up @@ -85,10 +92,14 @@ def init(self, conf):
else:
self.marker = conf.get_string(JOB_PUBLISH_TAG)

trust = neo4j.TRUST_SYSTEM_CA_SIGNED_CERTIFICATES if conf.get_bool(NEO4J_VALIDATE_SSL) \
else neo4j.TRUST_ALL_CERTIFICATES
self._driver = \
GraphDatabase.driver(conf.get_string(NEO4J_END_POINT_KEY),
max_connection_life_time=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC),
auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)))
auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)),
encrypted=conf.get_bool(NEO4J_ENCRYPTED),
trust=trust)

def run(self):
# type: () -> None
Expand Down
2 changes: 2 additions & 0 deletions docs/dashboard_ingestion_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ job_config = ConfigFactory.from_dict({
neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW):
neo4j_password,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_ENCRYPTED):
False,
'extractor.search_data.{}'.format(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY):
Neo4jSearchDataExtractor.DEFAULT_NEO4J_DASHBOARD_CYPHER_QUERY,
'extractor.search_data.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG):
Expand Down
4 changes: 4 additions & 0 deletions example/scripts/sample_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def run_csv_job(file_loc, table_name, model):
'publisher.neo4j.neo4j_endpoint': neo4j_endpoint,
'publisher.neo4j.neo4j_user': neo4j_user,
'publisher.neo4j.neo4j_password': neo4j_password,
'publisher.neo4j.neo4j_encrypted': False,
'publisher.neo4j.job_publish_tag': 'unique_tag', # should use unique tag here like {ds}
})

Expand Down Expand Up @@ -120,6 +121,7 @@ def run_table_column_job(table_path, column_path):
'publisher.neo4j.neo4j_endpoint': neo4j_endpoint,
'publisher.neo4j.neo4j_user': neo4j_user,
'publisher.neo4j.neo4j_password': neo4j_password,
'publisher.neo4j.neo4j_encrypted': False,
'publisher.neo4j.job_publish_tag': 'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
Expand Down Expand Up @@ -148,6 +150,7 @@ def create_last_updated_job():
'publisher.neo4j.neo4j_endpoint': neo4j_endpoint,
'publisher.neo4j.neo4j_user': neo4j_user,
'publisher.neo4j.neo4j_password': neo4j_password,
'publisher.neo4j.neo4j_encrypted': False,
'publisher.neo4j.job_publish_tag': 'unique_lastupdated_tag', # should use unique tag here like {ds}
})

Expand Down Expand Up @@ -189,6 +192,7 @@ def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index
'extractor.search_data.extractor.neo4j.model_class': model_name,
'extractor.search_data.extractor.neo4j.neo4j_auth_user': neo4j_user,
'extractor.search_data.extractor.neo4j.neo4j_auth_pw': neo4j_password,
'extractor.search_data.extractor.neo4j.neo4j_encrypted': False,
'loader.filesystem.elasticsearch.file_path': extracted_search_data_path,
'loader.filesystem.elasticsearch.mode': 'w',
'publisher.elasticsearch.file_path': extracted_search_data_path,
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/publisher/test_neo4j_csv_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import uuid

from mock import patch, MagicMock
from neo4j.v1 import GraphDatabase
from neo4j import GraphDatabase
from pyhocon import ConfigFactory

from databuilder.publisher import neo4j_csv_publisher
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/task/test_neo4j_staleness_removal_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import unittest

from mock import patch
from neo4j.v1 import GraphDatabase
from neo4j import GraphDatabase
from pyhocon import ConfigFactory

from databuilder.publisher import neo4j_csv_publisher
Expand Down

0 comments on commit 29bfcea

Please sign in to comment.