Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datapusher COPY mode #221

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 104 additions & 23 deletions datapusher/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import ckanserviceprovider.job as job
import ckanserviceprovider.util as util
from ckanserviceprovider import web
import psycopg2
import csv
import six
from pathlib import Path

if locale.getdefaultlocale()[0]:
lang, encoding = locale.getdefaultlocale()
Expand All @@ -33,6 +37,8 @@
CHUNK_SIZE = web.app.config.get('CHUNK_SIZE') or 16384
CHUNK_INSERT_ROWS = web.app.config.get('CHUNK_INSERT_ROWS') or 250
DOWNLOAD_TIMEOUT = web.app.config.get('DOWNLOAD_TIMEOUT') or 30
COPY_MODE_SIZE = web.app.config.get('COPY_MODE_SIZE') or 0
COPY_WRITE_ENGINE_URL = web.app.config.get('COPY_WRITE_ENGINE_URL')

if web.app.config.get('SSL_VERIFY') in ['False', 'FALSE', '0', False, 0]:
SSL_VERIFY = False
Expand Down Expand Up @@ -125,7 +131,8 @@ def get_url(action, ckan_url):
ckan_url=ckan_url, action=action)


def check_response(response, request_url, who, good_status=(201, 200), ignore_no_success=False):
def check_response(response, request_url, who, good_status=(201, 200),
ignore_no_success=False):
"""
Checks the response and raises exceptions if something went terribly wrong

Expand Down Expand Up @@ -185,7 +192,7 @@ def chunky(items, num_items_per_chunk):


class DatastoreEncoder(json.JSONEncoder):
# Custon JSON encoder
# Custom JSON encoder
def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
Expand Down Expand Up @@ -217,7 +224,7 @@ def datastore_resource_exists(resource_id, api_key, ckan_url):
response = requests.post(search_url,
verify=SSL_VERIFY,
data=json.dumps({'id': resource_id,
'limit': 0}),
'limit': 0}),
headers={'Content-Type': 'application/json',
'Authorization': api_key}
)
Expand All @@ -235,8 +242,8 @@ def datastore_resource_exists(resource_id, api_key, ckan_url):
'Error getting datastore resource ({!s}).'.format(e))


def send_resource_to_datastore(resource, headers, records,
is_it_the_last_chunk, api_key, ckan_url):
def send_resource_to_datastore(resource, headers, api_key, ckan_url,
records, is_it_the_last_chunk, ):
"""
Stores records in CKAN datastore
"""
Expand Down Expand Up @@ -292,7 +299,7 @@ def get_resource(resource_id, ckan_url, api_key):


def validate_input(input):
# Especially validate metdata which is provided by the user
# Especially validate metadata which is provided by the user
if 'metadata' not in input:
raise util.JobError('Metadata missing')

Expand Down Expand Up @@ -335,7 +342,7 @@ def push_to_datastore(task_id, input, dry_run=False):

try:
resource = get_resource(resource_id, ckan_url, api_key)
except util.JobError as e:
except util.JobError:
# try again in 5 seconds just incase CKAN is slow at adding resource
time.sleep(5)
resource = get_resource(resource_id, ckan_url, api_key)
Expand Down Expand Up @@ -379,7 +386,7 @@ def push_to_datastore(task_id, input, dry_run=False):
except ValueError:
pass

tmp = tempfile.TemporaryFile()
tmp = tempfile.NamedTemporaryFile()
length = 0
m = hashlib.md5()
for chunk in response.iter_content(CHUNK_SIZE):
Expand All @@ -406,8 +413,7 @@ def push_to_datastore(task_id, input, dry_run=False):
file_hash = m.hexdigest()
tmp.seek(0)

if (resource.get('hash') == file_hash
and not data.get('ignore_hash')):
if (resource.get('hash') == file_hash and not data.get('ignore_hash')):
logger.info("The file hash hasn't changed: {hash}.".format(
hash=file_hash))
return
Expand All @@ -421,7 +427,8 @@ def push_to_datastore(task_id, input, dry_run=False):
tmp.seek(0)
try:
format = resource.get('format')
table_set = messytables.any_tableset(tmp, mimetype=format, extension=format)
table_set = messytables.any_tableset(tmp, mimetype=format,
extension=format)
except:
raise util.JobError(e)

Expand Down Expand Up @@ -475,7 +482,7 @@ def row_iterator():
result = row_iterator()

'''
Delete existing datstore resource before proceeding. Otherwise
Delete existing datastore resource before proceeding. Otherwise
'datastore_create' will append to the existing datastore. And if
the fields have significantly changed, it may also fail.
'''
Expand Down Expand Up @@ -503,17 +510,91 @@ def row_iterator():
if dry_run:
return headers_dicts, result

count = 0
for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)):
records, is_it_the_last_chunk = chunk
count += len(records)
logger.info('Saving chunk {number} {is_last}'.format(
number=i, is_last='(last)' if is_it_the_last_chunk else ''))
send_resource_to_datastore(resource, headers_dicts, records,
is_it_the_last_chunk, api_key, ckan_url)
fileSize = Path(tmp.name).stat().st_size

# If COPY_MODE_SIZE is zero, or the filesize is less than the
# COPY_MODE_SIZE threshold in bytes, push thru Datastore API.
# Otherwise, use COPY if we have a COPY_WRITE_ENGINE_URL
if not COPY_MODE_SIZE or not COPY_WRITE_ENGINE_URL or fileSize < COPY_MODE_SIZE:
count = 0
notify_time = timer_start = time.perf_counter()
for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)):
records, is_it_the_last_chunk = chunk
count += len(records)
if is_it_the_last_chunk or notify_time < time.perf_counter():
logger.info('Saving chunk {number} {is_last}- {n} rows - {elapsed} seconds'.format(
number=i, is_last='(last)' if is_it_the_last_chunk else '',
n='{:,}'.format(count), elapsed='{:,.2f}'.format(time.perf_counter() - timer_start)))
notify_time += 20
send_resource_to_datastore(resource, headers_dicts, api_key,
ckan_url, records, is_it_the_last_chunk)

elapsed = time.perf_counter() - timer_start
logger.info('Successfully pushed {n} entries to "{res_id}" in {elapsed} seconds.'.format(
n='{:,}'.format(count), res_id=resource_id, elapsed='{:,.2f}'.format(elapsed)))

if data.get('set_url_type', False):
update_resource(resource, api_key, ckan_url)
else:
# for larger files, use Postgres COPY as its much faster
logger.info('Copying to database...')
timer_start = time.perf_counter()

# first, let's create an empty datastore table w/ guessed types
send_resource_to_datastore(resource, headers_dicts, api_key, ckan_url,
records=None, is_it_the_last_chunk=False)

# Guess the delimiter used in the file for copy
with open(tmp.name, 'rb') as f:
header_line = f.readline()
try:
sniffer = csv.Sniffer()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding this correctly, you're using the file as a raw CSV to push to the database, relying on the database's native interpretation of the datatypes/nulls/etc for conversion. This is only going to work for files that have come in as CSV, not ones that are uploaded as .xls/ods and have been interpreted/converted by Messytables. The only criteria for the legacy vs copy is file-size, so some previously handled files won't be handled anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EricSoroos, this is an old PR and I have since created a fork of Datapusher (https://github.com/dathere/datapusher-plus) that now also supports xls/ods files, always uses Postgres copy and dropped the legacy datapusher stuff, and replaced messytables with qsv.

Would be interested in your feedback.

delimiter = sniffer.sniff(six.ensure_text(header_line)).delimiter
except csv.Error:
logger.warning('Could not determine delimiter, using ","')
delimiter = ','

rowcount = 0
try:
raw_connection = psycopg2.connect(COPY_WRITE_ENGINE_URL)
except psycopg2.Error as e:
logger.warning(str(e))
else:
cur = raw_connection.cursor()
# truncate table to use copy freeze option and further increase
# performance as there is no need for WAL logs to be maintained
# https://www.postgresql.org/docs/9.1/populate.html#POPULATE-COPY-FROM
cur.execute('TRUNCATE TABLE \"{resource_id}\";'.format(resource_id=resource_id))

copy_sql = ("COPY \"{resource_id}\" ({column_names}) FROM STDIN "
"WITH (DELIMITER '{delimiter}', FORMAT CSV, FREEZE 1, "
"HEADER 1, ENCODING 'UTF8');").format(
resource_id=resource_id,
column_names=', '.join(['"{}"'.format(h['id'])
for h in headers_dicts]),
delimiter=delimiter)
logger.info(copy_sql)
with open(tmp.name, 'rb') as f:
try:
cur.copy_expert(copy_sql, f)
except psycopg2.Error as e:
logger.warning(str(e))
else:
rowcount = cur.rowcount

logger.info('Successfully pushed {n} entries to "{res_id}".'.format(
n=count, res_id=resource_id))
raw_connection.commit()
# this is needed to issue a VACUUM ANALYZE
raw_connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = raw_connection.cursor()
logger.info('Vacuum Analyzing table...')
cur.execute('VACUUM ANALYZE \"{resource_id}\";'.format(resource_id=resource_id))
raw_connection.close()

if data.get('set_url_type', False):
elapsed = time.perf_counter() - timer_start
logger.info('...copying done. Copied {n} entries to "{res_id}" in {elapsed} seconds.'.format(
n='{:,}'.format(rowcount), res_id=resource_id, elapsed='{:,.2f}'.format(elapsed)))

resource['datastore_active'] = True
update_resource(resource, api_key, ckan_url)

tmp.close() # close temporary file
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ html5lib==1.0.1
messytables==0.15.2
certifi
requests[security]==2.24.0
psycopg2
six