From 6c1e7b5e30601106fc7d06dc0c8e308c4151fc28 Mon Sep 17 00:00:00 2001 From: Alex Varghese Date: Thu, 18 Jun 2020 17:22:55 -0400 Subject: [PATCH 1/4] adding a new method to retry jsonDecode errors --- tap_sendgrid/http.py | 14 ++++++++++++++ tap_sendgrid/syncs.py | 4 ++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/tap_sendgrid/http.py b/tap_sendgrid/http.py index 762df6e..31b3930 100644 --- a/tap_sendgrid/http.py +++ b/tap_sendgrid/http.py @@ -2,6 +2,7 @@ import singer import time +from simplejson import JSONDecodeError from singer import metrics session = requests.Session() @@ -27,6 +28,19 @@ def end_of_records_check(r): return False +def json_parse_retry(tap_stream_id, url, config, params=None): + retries = 3 + attempt = 1 + while retries >= attempt: + r = retry_get(tap_stream_id, url, config, params=None) + try: + r.json().get('recipients') + except JSONDecodeError as e: + attempt += 1 + continue + break + return r + def retry_get(tap_stream_id, url, config, params=None): """Wrap certain streams in a retry wrapper for frequent 500s""" retries = 20 diff --git a/tap_sendgrid/syncs.py b/tap_sendgrid/syncs.py index 98c081a..7ae1391 100644 --- a/tap_sendgrid/syncs.py +++ b/tap_sendgrid/syncs.py @@ -3,7 +3,7 @@ from simplejson.scanner import JSONDecodeError from .streams import IDS -from .http import end_of_records_check, retry_get +from .http import end_of_records_check, retry_get, json_parse_retry from .utils import ( trimmed_records, trim_members_all, get_results_from_payload, safe_update_dict, @@ -175,7 +175,7 @@ def get_using_paged(self, stream, add_params=None, url_key=None): 'page_size': page_size } safe_update_dict(params, add_params) - r = retry_get(stream.tap_stream_id, + r = json_parse_retry(stream.tap_stream_id, endpoint, self.ctx.config, params=params) From 5d8461e714c0dbecf5beba982459c49a5ee10ae1 Mon Sep 17 00:00:00 2001 From: Alex Varghese Date: Fri, 19 Jun 2020 12:12:40 -0400 Subject: [PATCH 2/4] refactoring exception error --- tap_sendgrid/http.py | 13 ------------- tap_sendgrid/syncs.py | 23 +++++++++++++++++------ 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/tap_sendgrid/http.py b/tap_sendgrid/http.py index 31b3930..2f1eb14 100644 --- a/tap_sendgrid/http.py +++ b/tap_sendgrid/http.py @@ -28,19 +28,6 @@ def end_of_records_check(r): return False -def json_parse_retry(tap_stream_id, url, config, params=None): - retries = 3 - attempt = 1 - while retries >= attempt: - r = retry_get(tap_stream_id, url, config, params=None) - try: - r.json().get('recipients') - except JSONDecodeError as e: - attempt += 1 - continue - break - return r - def retry_get(tap_stream_id, url, config, params=None): """Wrap certain streams in a retry wrapper for frequent 500s""" retries = 20 diff --git a/tap_sendgrid/syncs.py b/tap_sendgrid/syncs.py index 7ae1391..962154f 100644 --- a/tap_sendgrid/syncs.py +++ b/tap_sendgrid/syncs.py @@ -168,6 +168,7 @@ def get_using_paged(self, stream, add_params=None, url_key=None): page = 1 page_size = 1000 endpoint = stream.endpoint.format(url_key) if url_key else stream.endpoint + page_attempts = 0 while True: params = { @@ -175,15 +176,25 @@ def get_using_paged(self, stream, add_params=None, url_key=None): 'page_size': page_size } safe_update_dict(params, add_params) - r = json_parse_retry(stream.tap_stream_id, + r = retry_get(stream.tap_stream_id, endpoint, self.ctx.config, params=params) - yield r - if not end_of_records_check(r): - page += 1 - else: - break + try: + yield r.json() + if not end_of_records_check(r): + page_attempts = 0 + page += 1 + else: + break + except JSONDecodeError: + page_attempts += 1 + if page_attempts > 3: + continue + else: + logger.error(f'Status code throwing error {r.status_code}') + logger.error(f'Content for invalid request:\n{r.content}') + raise ValueError('Error parsing file...') def get_using_offset(self, stream, start, end, url_key=None): offset = 0 From 3d5c168603c5d654732ff0bcfe638a82f647ab85 Mon Sep 17 00:00:00 2001 From: Alex Varghese Date: Fri, 19 Jun 2020 12:14:10 -0400 Subject: [PATCH 3/4] removing unnecessary imports --- tap_sendgrid/http.py | 1 - tap_sendgrid/syncs.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/tap_sendgrid/http.py b/tap_sendgrid/http.py index 2f1eb14..762df6e 100644 --- a/tap_sendgrid/http.py +++ b/tap_sendgrid/http.py @@ -2,7 +2,6 @@ import singer import time -from simplejson import JSONDecodeError from singer import metrics session = requests.Session() diff --git a/tap_sendgrid/syncs.py b/tap_sendgrid/syncs.py index 962154f..2d4a99a 100644 --- a/tap_sendgrid/syncs.py +++ b/tap_sendgrid/syncs.py @@ -3,7 +3,7 @@ from simplejson.scanner import JSONDecodeError from .streams import IDS -from .http import end_of_records_check, retry_get, json_parse_retry +from .http import end_of_records_check, retry_get from .utils import ( trimmed_records, trim_members_all, get_results_from_payload, safe_update_dict, From 81cb309aba36b0e8c262685310b4e5f552b6361f Mon Sep 17 00:00:00 2001 From: Alex Varghese Date: Mon, 22 Jun 2020 10:01:33 -0400 Subject: [PATCH 4/4] refactoring method --- tap_sendgrid/syncs.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tap_sendgrid/syncs.py b/tap_sendgrid/syncs.py index 2d4a99a..972e433 100644 --- a/tap_sendgrid/syncs.py +++ b/tap_sendgrid/syncs.py @@ -57,7 +57,7 @@ def write_paged_records(self, stream, schema, for res in self.get_using_paged(stream, add_params=params, url_key=url_key): try: - results = res.json().get('recipients') + results = res.get('recipients') except JSONDecodeError as e: logger.info(f'Response: {res}') raise e @@ -182,19 +182,19 @@ def get_using_paged(self, stream, add_params=None, url_key=None): params=params) try: yield r.json() - if not end_of_records_check(r): - page_attempts = 0 - page += 1 - else: - break except JSONDecodeError: page_attempts += 1 - if page_attempts > 3: + if page_attempts < 3: continue else: logger.error(f'Status code throwing error {r.status_code}') logger.error(f'Content for invalid request:\n{r.content}') raise ValueError('Error parsing file...') + if not end_of_records_check(r): + page_attempts = 0 + page += 1 + else: + break def get_using_offset(self, stream, start, end, url_key=None): offset = 0