From 21d344f15e620b7c28a41114bef53a04cc43d0be Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Sat, 2 Jan 2021 11:08:10 -0500 Subject: [PATCH 1/7] COPY_MODE use postgres COPY for faster datapusher --- datapusher/jobs.py | 84 +++++++++++++++++++++++++++++++++++++++++----- requirements.txt | 2 ++ 2 files changed, 77 insertions(+), 9 deletions(-) diff --git a/datapusher/jobs.py b/datapusher/jobs.py index 3f6c309..a5233b2 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -22,6 +22,9 @@ import ckanserviceprovider.job as job import ckanserviceprovider.util as util from ckanserviceprovider import web +import psycopg2 +import csv +import six if locale.getdefaultlocale()[0]: lang, encoding = locale.getdefaultlocale() @@ -33,6 +36,8 @@ CHUNK_SIZE = web.app.config.get('CHUNK_SIZE') or 16384 CHUNK_INSERT_ROWS = web.app.config.get('CHUNK_INSERT_ROWS') or 250 DOWNLOAD_TIMEOUT = web.app.config.get('DOWNLOAD_TIMEOUT') or 30 +COPY_MODE = web.app.config.get('COPY_MODE') or False +COPY_WRITE_ENGINE_URL = web.app.config.get('COPY_WRITE_ENGINE_URL') if web.app.config.get('SSL_VERIFY') in ['False', 'FALSE', '0', False, 0]: SSL_VERIFY = False @@ -503,17 +508,78 @@ def row_iterator(): if dry_run: return headers_dicts, result - count = 0 - for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)): - records, is_it_the_last_chunk = chunk - count += len(records) - logger.info('Saving chunk {number} {is_last}'.format( - number=i, is_last='(last)' if is_it_the_last_chunk else '')) + if not COPY_MODE: + count = 0 + for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)): + records, is_it_the_last_chunk = chunk + count += len(records) + logger.info('Saving chunk {number} {is_last}'.format( + number=i, is_last='(last)' if is_it_the_last_chunk else '')) + send_resource_to_datastore(resource, headers_dicts, records, + is_it_the_last_chunk, api_key, ckan_url) + + logger.info('Successfully pushed {n} entries to "{res_id}".'.format( + n=count, res_id=resource_id)) + else: + # use Postgres COPY so its much faster + logger.info('Copying to database...') + timer_start = time.perf_counter() + + # first, let's create an empty datastore table + # with the guessed data types + records = [{}] send_resource_to_datastore(resource, headers_dicts, records, - is_it_the_last_chunk, api_key, ckan_url) + is_it_the_last_chunk, api_key, ckan_url) - logger.info('Successfully pushed {n} entries to "{res_id}".'.format( - n=count, res_id=resource_id)) + # Guess the delimiter used in the file for copy + with open(tmp, 'rb') as f: + header_line = f.readline() + try: + sniffer = csv.Sniffer() + delimiter = sniffer.sniff(six.ensure_text(header_line)).delimiter + except csv.Error: + logger.warning('Could not determine delimiter from file, use default ","') + delimiter = ',' + + # now copy from file + try: + raw_connection = psycopg2.connect(COPY_WRITE_ENGINE_URL) + except psycopg2.Error as e: + error_str = str(e) + logger.warning(error_str) + rowcount = 0 + else: + try: + cur = raw_connection.cursor() + try: + copy_sql = "COPY \"{resource_id}\" ({column_names}) " + "FROM STDIN " + "WITH (DELIMITER '{delimiter}', FORMAT csv, HEADER 1, " + " ENCODING '{encoding}');" + .format( + resource_id=resource_id, + column_names=', '.join(['"{}"'.format(h['id']) + for h in headers_dicts]), + delimiter=delimiter, + encoding='UTF8', + ) + logger.info(copy_sql) + with open(tmp, 'rb') as f: + try: + cur.copy_expert(copy_sql, f) + except psycopg2.DataError as e: + error_str = str(e) + logger.warning(error_str) + finally: + rowcount = cur.rowcount + cur.close() + finally: + raw_connection.commit() + raw_connection.close() + + timer_stop = time.perf_counter() + logger.info('...copying done. Copied {n} entries to "{res_id}" in {elapsed} seconds.'.format( + n=rowcount, res_id=resource_id, elapsed=timer_stop - timer_start)) if data.get('set_url_type', False): update_resource(resource, api_key, ckan_url) diff --git a/requirements.txt b/requirements.txt index 483fa21..156d7dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,5 @@ html5lib==1.0.1 messytables==0.15.2 certifi requests[security]==2.24.0 +psycopg2 +six From 12b750f1128c6c98fdb94063a6fa2098fb60a19b Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Sat, 2 Jan 2021 19:14:13 -0500 Subject: [PATCH 2/7] Typos; PEP8; Analyze --- datapusher/jobs.py | 73 ++++++++++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/datapusher/jobs.py b/datapusher/jobs.py index a5233b2..472fb14 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -190,7 +190,7 @@ def chunky(items, num_items_per_chunk): class DatastoreEncoder(json.JSONEncoder): - # Custon JSON encoder + # Custom JSON encoder def default(self, obj): if isinstance(obj, datetime.datetime): return obj.isoformat() @@ -261,12 +261,12 @@ def send_resource_to_datastore(resource, headers, records, check_response(r, url, 'CKAN DataStore') -def update_resource(resource, api_key, ckan_url): +def update_resource(resource, api_key, ckan_url, url_type): """ Update webstore_url and webstore_last_updated in CKAN """ - resource['url_type'] = 'datapusher' + resource['url_type'] = url_type url = get_url('resource_update', ckan_url) r = requests.post( @@ -297,7 +297,7 @@ def get_resource(resource_id, ckan_url, api_key): def validate_input(input): - # Especially validate metdata which is provided by the user + # Especially validate metadata which is provided by the user if 'metadata' not in input: raise util.JobError('Metadata missing') @@ -340,7 +340,7 @@ def push_to_datastore(task_id, input, dry_run=False): try: resource = get_resource(resource_id, ckan_url, api_key) - except util.JobError as e: + except util.JobError: # try again in 5 seconds just incase CKAN is slow at adding resource time.sleep(5) resource = get_resource(resource_id, ckan_url, api_key) @@ -384,7 +384,7 @@ def push_to_datastore(task_id, input, dry_run=False): except ValueError: pass - tmp = tempfile.TemporaryFile() + tmp = tempfile.NamedTemporaryFile() length = 0 m = hashlib.md5() for chunk in response.iter_content(CHUNK_SIZE): @@ -411,8 +411,7 @@ def push_to_datastore(task_id, input, dry_run=False): file_hash = m.hexdigest() tmp.seek(0) - if (resource.get('hash') == file_hash - and not data.get('ignore_hash')): + if (resource.get('hash') == file_hash and not data.get('ignore_hash')): logger.info("The file hash hasn't changed: {hash}.".format( hash=file_hash)) return @@ -520,6 +519,9 @@ def row_iterator(): logger.info('Successfully pushed {n} entries to "{res_id}".'.format( n=count, res_id=resource_id)) + + if data.get('set_url_type', False): + update_resource(resource, api_key, ckan_url, 'datapusher') else: # use Postgres COPY so its much faster logger.info('Copying to database...') @@ -527,12 +529,11 @@ def row_iterator(): # first, let's create an empty datastore table # with the guessed data types - records = [{}] - send_resource_to_datastore(resource, headers_dicts, records, - is_it_the_last_chunk, api_key, ckan_url) + send_resource_to_datastore(resource, headers_dicts, None, + False, api_key, ckan_url) # Guess the delimiter used in the file for copy - with open(tmp, 'rb') as f: + with open(tmp.name, 'rb') as f: header_line = f.readline() try: sniffer = csv.Sniffer() @@ -551,20 +552,24 @@ def row_iterator(): else: try: cur = raw_connection.cursor() + # truncate table to use copy freeze option and further increase + # performance as there is no need for WAL logs this way + # https://www.cybertec-postgresql.com/en/loading-data-in-the-most-efficient-way/ + # https://www.postgresql.org/docs/9.1/populate.html#POPULATE-COPY-FROM + cur.execute('TRUNCATE TABLE \"{resource_id}\";'.format( + resource_id=resource_id)) try: - copy_sql = "COPY \"{resource_id}\" ({column_names}) " - "FROM STDIN " - "WITH (DELIMITER '{delimiter}', FORMAT csv, HEADER 1, " - " ENCODING '{encoding}');" - .format( - resource_id=resource_id, - column_names=', '.join(['"{}"'.format(h['id']) - for h in headers_dicts]), - delimiter=delimiter, - encoding='UTF8', - ) + copy_sql = ("COPY \"{resource_id}\" ({column_names}) FROM STDIN " + "WITH (DELIMITER '{delimiter}', FORMAT csv, FREEZE 1, " + "HEADER 1, ENCODING '{encoding}');").format( + resource_id=resource_id, + column_names=', '.join(['"{}"'.format(h['id']) + for h in headers_dicts]), + delimiter=delimiter, + encoding='UTF8', + ) logger.info(copy_sql) - with open(tmp, 'rb') as f: + with open(tmp.name, 'rb') as f: try: cur.copy_expert(copy_sql, f) except psycopg2.DataError as e: @@ -572,14 +577,24 @@ def row_iterator(): logger.warning(error_str) finally: rowcount = cur.rowcount - cur.close() finally: raw_connection.commit() + raw_connection.set_isolation_level( + psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cur = raw_connection.cursor() + logger.info('Vacuum Analyzing table...') + cur.execute('VACUUM ANALYZE \"{resource_id}\";'.format( + resource_id=resource_id)) raw_connection.close() - timer_stop = time.perf_counter() + elapsed = time.perf_counter() - timer_start logger.info('...copying done. Copied {n} entries to "{res_id}" in {elapsed} seconds.'.format( - n=rowcount, res_id=resource_id, elapsed=timer_stop - timer_start)) + n='{:,}'.format(rowcount), res_id=resource_id, elapsed='{:,.2f}'.format(elapsed))) + + send_resource_to_datastore(resource, headers_dicts, None, + True, api_key, ckan_url) + + resource['datastore_active'] = True + update_resource(resource, api_key, ckan_url, 'upload') - if data.get('set_url_type', False): - update_resource(resource, api_key, ckan_url) + tmp.close() # close temporary file From b1e74131e46bef8ad832890e337d83ac1321238f Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Sun, 3 Jan 2021 18:16:49 -0500 Subject: [PATCH 3/7] PEP8; improve msgs; refactor COPY PEP8 reformatting; Improve Datapusher non-COPY messages; remove unnecessary TRYs in COPY mode --- datapusher/jobs.py | 91 +++++++++++++++++++++++----------------------- 1 file changed, 45 insertions(+), 46 deletions(-) diff --git a/datapusher/jobs.py b/datapusher/jobs.py index 472fb14..cddcf3e 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -130,7 +130,8 @@ def get_url(action, ckan_url): ckan_url=ckan_url, action=action) -def check_response(response, request_url, who, good_status=(201, 200), ignore_no_success=False): +def check_response(response, request_url, who, good_status=(201, 200), + ignore_no_success=False): """ Checks the response and raises exceptions if something went terribly wrong @@ -222,7 +223,7 @@ def datastore_resource_exists(resource_id, api_key, ckan_url): response = requests.post(search_url, verify=SSL_VERIFY, data=json.dumps({'id': resource_id, - 'limit': 0}), + 'limit': 0}), headers={'Content-Type': 'application/json', 'Authorization': api_key} ) @@ -240,8 +241,8 @@ def datastore_resource_exists(resource_id, api_key, ckan_url): 'Error getting datastore resource ({!s}).'.format(e)) -def send_resource_to_datastore(resource, headers, records, - is_it_the_last_chunk, api_key, ckan_url): +def send_resource_to_datastore(resource, headers, api_key, ckan_url, + records, is_it_the_last_chunk, ): """ Stores records in CKAN datastore """ @@ -425,7 +426,8 @@ def push_to_datastore(task_id, input, dry_run=False): tmp.seek(0) try: format = resource.get('format') - table_set = messytables.any_tableset(tmp, mimetype=format, extension=format) + table_set = messytables.any_tableset(tmp, mimetype=format, + extension=format) except: raise util.JobError(e) @@ -509,16 +511,21 @@ def row_iterator(): if not COPY_MODE: count = 0 + notify_time = timer_start = time.perf_counter() for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)): records, is_it_the_last_chunk = chunk count += len(records) - logger.info('Saving chunk {number} {is_last}'.format( - number=i, is_last='(last)' if is_it_the_last_chunk else '')) - send_resource_to_datastore(resource, headers_dicts, records, - is_it_the_last_chunk, api_key, ckan_url) + if is_it_the_last_chunk or notify_time < time.perf_counter(): + logger.info('Saving chunk {number} {is_last}- {n} rows - {elapsed} seconds'.format( + number=i, is_last='(last)' if is_it_the_last_chunk else '', + n='{:,}'.format(count), elapsed='{:,.2f}'.format(time.perf_counter() - timer_start))) + notify_time += 20 + send_resource_to_datastore(resource, headers_dicts, api_key, + ckan_url, records, is_it_the_last_chunk) - logger.info('Successfully pushed {n} entries to "{res_id}".'.format( - n=count, res_id=resource_id)) + elapsed = time.perf_counter() - timer_start + logger.info('Successfully pushed {n} entries to "{res_id}" in {elapsed} seconds.'.format( + n='{:,}'.format(count), res_id=resource_id, elapsed='{:,.2f}'.format(elapsed))) if data.get('set_url_type', False): update_resource(resource, api_key, ckan_url, 'datapusher') @@ -529,8 +536,8 @@ def row_iterator(): # first, let's create an empty datastore table # with the guessed data types - send_resource_to_datastore(resource, headers_dicts, None, - False, api_key, ckan_url) + send_resource_to_datastore(resource, headers_dicts, api_key, ckan_url, + records=None, is_it_the_last_chunk=False) # Guess the delimiter used in the file for copy with open(tmp.name, 'rb') as f: @@ -539,7 +546,7 @@ def row_iterator(): sniffer = csv.Sniffer() delimiter = sniffer.sniff(six.ensure_text(header_line)).delimiter except csv.Error: - logger.warning('Could not determine delimiter from file, use default ","') + logger.warning('Could not determine delimiter from file, using ","') delimiter = ',' # now copy from file @@ -550,50 +557,42 @@ def row_iterator(): logger.warning(error_str) rowcount = 0 else: - try: - cur = raw_connection.cursor() - # truncate table to use copy freeze option and further increase - # performance as there is no need for WAL logs this way - # https://www.cybertec-postgresql.com/en/loading-data-in-the-most-efficient-way/ - # https://www.postgresql.org/docs/9.1/populate.html#POPULATE-COPY-FROM - cur.execute('TRUNCATE TABLE \"{resource_id}\";'.format( - resource_id=resource_id)) - try: - copy_sql = ("COPY \"{resource_id}\" ({column_names}) FROM STDIN " + cur = raw_connection.cursor() + # we truncate table to use copy freeze option and further increase + # performance as there is no need for WAL logs to be maintained + # https://www.cybertec-postgresql.com/en/loading-data-in-the-most-efficient-way/ + # https://www.postgresql.org/docs/9.1/populate.html#POPULATE-COPY-FROM + cur.execute('TRUNCATE TABLE \"{resource_id}\";'.format(resource_id=resource_id)) + + copy_sql = ("COPY \"{resource_id}\" ({column_names}) FROM STDIN " "WITH (DELIMITER '{delimiter}', FORMAT csv, FREEZE 1, " "HEADER 1, ENCODING '{encoding}');").format( resource_id=resource_id, column_names=', '.join(['"{}"'.format(h['id']) for h in headers_dicts]), delimiter=delimiter, - encoding='UTF8', - ) - logger.info(copy_sql) - with open(tmp.name, 'rb') as f: - try: - cur.copy_expert(copy_sql, f) - except psycopg2.DataError as e: - error_str = str(e) - logger.warning(error_str) - finally: + encoding='UTF8') + logger.info(copy_sql) + with open(tmp.name, 'rb') as f: + try: + cur.copy_expert(copy_sql, f) + except psycopg2.Error as e: + error_str = str(e) + logger.warning(error_str) + else: rowcount = cur.rowcount - finally: - raw_connection.commit() - raw_connection.set_isolation_level( - psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - cur = raw_connection.cursor() - logger.info('Vacuum Analyzing table...') - cur.execute('VACUUM ANALYZE \"{resource_id}\";'.format( - resource_id=resource_id)) - raw_connection.close() + + raw_connection.commit() + raw_connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cur = raw_connection.cursor() + logger.info('Vacuum Analyzing table...') + cur.execute('VACUUM ANALYZE \"{resource_id}\";'.format(resource_id=resource_id)) + raw_connection.close() elapsed = time.perf_counter() - timer_start logger.info('...copying done. Copied {n} entries to "{res_id}" in {elapsed} seconds.'.format( n='{:,}'.format(rowcount), res_id=resource_id, elapsed='{:,.2f}'.format(elapsed))) - send_resource_to_datastore(resource, headers_dicts, None, - True, api_key, ckan_url) - resource['datastore_active'] = True update_resource(resource, api_key, ckan_url, 'upload') From af6b701feb53749403212972492d5730b5d50af1 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Mon, 4 Jan 2021 10:43:29 -0500 Subject: [PATCH 4/7] Init rowcount earlier when COPY fails, we need it in the info message. --- datapusher/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datapusher/jobs.py b/datapusher/jobs.py index cddcf3e..b944c8a 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -550,12 +550,12 @@ def row_iterator(): delimiter = ',' # now copy from file + rowcount = 0 try: raw_connection = psycopg2.connect(COPY_WRITE_ENGINE_URL) except psycopg2.Error as e: error_str = str(e) logger.warning(error_str) - rowcount = 0 else: cur = raw_connection.cursor() # we truncate table to use copy freeze option and further increase From bf9cf8b717aff0e04e15182e69efefa555f64083 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Mon, 4 Jan 2021 10:55:58 -0500 Subject: [PATCH 5/7] Update_resource Going back to old behavior. Misunderstood the meaning of 'url_type' --- datapusher/jobs.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datapusher/jobs.py b/datapusher/jobs.py index b944c8a..4fc5e70 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -262,12 +262,12 @@ def send_resource_to_datastore(resource, headers, api_key, ckan_url, check_response(r, url, 'CKAN DataStore') -def update_resource(resource, api_key, ckan_url, url_type): +def update_resource(resource, api_key, ckan_url): """ Update webstore_url and webstore_last_updated in CKAN """ - resource['url_type'] = url_type + resource['url_type'] = 'datapusher' url = get_url('resource_update', ckan_url) r = requests.post( @@ -528,7 +528,7 @@ def row_iterator(): n='{:,}'.format(count), res_id=resource_id, elapsed='{:,.2f}'.format(elapsed))) if data.get('set_url_type', False): - update_resource(resource, api_key, ckan_url, 'datapusher') + update_resource(resource, api_key, ckan_url) else: # use Postgres COPY so its much faster logger.info('Copying to database...') @@ -594,6 +594,6 @@ def row_iterator(): n='{:,}'.format(rowcount), res_id=resource_id, elapsed='{:,.2f}'.format(elapsed))) resource['datastore_active'] = True - update_resource(resource, api_key, ckan_url, 'upload') + update_resource(resource, api_key, ckan_url) tmp.close() # close temporary file From df83edc075af4d9c6e8407080e177d797b9284f1 Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Tue, 5 Jan 2021 00:12:17 -0500 Subject: [PATCH 6/7] Conditional COPY If COPY_MODE_SIZE is zero, or the filesize is less than the COPY_MODE_SIZE threshold in bytes, push thru Datastore API. Otherwise, use COPY if we have a COPY_WRITE_ENGINE_URL. Corrected minor typos and streamlined exception handling. --- datapusher/jobs.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/datapusher/jobs.py b/datapusher/jobs.py index 4fc5e70..a0f495a 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -25,6 +25,7 @@ import psycopg2 import csv import six +from pathlib import Path if locale.getdefaultlocale()[0]: lang, encoding = locale.getdefaultlocale() @@ -36,7 +37,7 @@ CHUNK_SIZE = web.app.config.get('CHUNK_SIZE') or 16384 CHUNK_INSERT_ROWS = web.app.config.get('CHUNK_INSERT_ROWS') or 250 DOWNLOAD_TIMEOUT = web.app.config.get('DOWNLOAD_TIMEOUT') or 30 -COPY_MODE = web.app.config.get('COPY_MODE') or False +COPY_MODE_SIZE = web.app.config.get('COPY_MODE_SIZE') or 0 COPY_WRITE_ENGINE_URL = web.app.config.get('COPY_WRITE_ENGINE_URL') if web.app.config.get('SSL_VERIFY') in ['False', 'FALSE', '0', False, 0]: @@ -481,7 +482,7 @@ def row_iterator(): result = row_iterator() ''' - Delete existing datstore resource before proceeding. Otherwise + Delete existing datastore resource before proceeding. Otherwise 'datastore_create' will append to the existing datastore. And if the fields have significantly changed, it may also fail. ''' @@ -509,7 +510,12 @@ def row_iterator(): if dry_run: return headers_dicts, result - if not COPY_MODE: + fileSize = Path(tmp.name).stat().st_size + + # If COPY_MODE_SIZE is zero, or the filesize is less than the + # COPY_MODE_SIZE threshold in bytes, push thru Datastore API. + # Otherwise, use COPY if we have a COPY_WRITE_ENGINE_URL + if not COPY_MODE_SIZE or not COPY_WRITE_ENGINE_URL or fileSize < COPY_MODE_SIZE: count = 0 notify_time = timer_start = time.perf_counter() for i, chunk in enumerate(chunky(result, CHUNK_INSERT_ROWS)): @@ -530,12 +536,11 @@ def row_iterator(): if data.get('set_url_type', False): update_resource(resource, api_key, ckan_url) else: - # use Postgres COPY so its much faster + # for larger files, use Postgres COPY as its much faster logger.info('Copying to database...') timer_start = time.perf_counter() - # first, let's create an empty datastore table - # with the guessed data types + # first, let's create an empty datastore table w/ guessed types send_resource_to_datastore(resource, headers_dicts, api_key, ckan_url, records=None, is_it_the_last_chunk=False) @@ -546,21 +551,18 @@ def row_iterator(): sniffer = csv.Sniffer() delimiter = sniffer.sniff(six.ensure_text(header_line)).delimiter except csv.Error: - logger.warning('Could not determine delimiter from file, using ","') + logger.warning('Could not determine delimiter, using ","') delimiter = ',' - # now copy from file rowcount = 0 try: raw_connection = psycopg2.connect(COPY_WRITE_ENGINE_URL) except psycopg2.Error as e: - error_str = str(e) - logger.warning(error_str) + logger.warning(str(e)) else: cur = raw_connection.cursor() - # we truncate table to use copy freeze option and further increase + # truncate table to use copy freeze option and further increase # performance as there is no need for WAL logs to be maintained - # https://www.cybertec-postgresql.com/en/loading-data-in-the-most-efficient-way/ # https://www.postgresql.org/docs/9.1/populate.html#POPULATE-COPY-FROM cur.execute('TRUNCATE TABLE \"{resource_id}\";'.format(resource_id=resource_id)) @@ -577,8 +579,7 @@ def row_iterator(): try: cur.copy_expert(copy_sql, f) except psycopg2.Error as e: - error_str = str(e) - logger.warning(error_str) + logger.warning(str(e)) else: rowcount = cur.rowcount From f9b855ecaafb51995719d61fee304c0c5cabbcec Mon Sep 17 00:00:00 2001 From: Joel Natividad <1980690+jqnatividad@users.noreply.github.com> Date: Fri, 15 Jan 2021 09:40:50 -0500 Subject: [PATCH 7/7] Comments; remove unneeded parm --- datapusher/jobs.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datapusher/jobs.py b/datapusher/jobs.py index a0f495a..49faea9 100644 --- a/datapusher/jobs.py +++ b/datapusher/jobs.py @@ -567,13 +567,12 @@ def row_iterator(): cur.execute('TRUNCATE TABLE \"{resource_id}\";'.format(resource_id=resource_id)) copy_sql = ("COPY \"{resource_id}\" ({column_names}) FROM STDIN " - "WITH (DELIMITER '{delimiter}', FORMAT csv, FREEZE 1, " - "HEADER 1, ENCODING '{encoding}');").format( + "WITH (DELIMITER '{delimiter}', FORMAT CSV, FREEZE 1, " + "HEADER 1, ENCODING 'UTF8');").format( resource_id=resource_id, column_names=', '.join(['"{}"'.format(h['id']) for h in headers_dicts]), - delimiter=delimiter, - encoding='UTF8') + delimiter=delimiter) logger.info(copy_sql) with open(tmp.name, 'rb') as f: try: @@ -584,6 +583,7 @@ def row_iterator(): rowcount = cur.rowcount raw_connection.commit() + # this is needed to issue a VACUUM ANALYZE raw_connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cur = raw_connection.cursor() logger.info('Vacuum Analyzing table...')